Hi, I decided to come back to this problem and find a better solution than the 
random sleeps and retries.
We tried using optimistic locking, but it doesn't seem to help at all.
Here's a test program, using 2 replicated cache instances and writing to the 
same node:


  | package CacheRepl;
  | 
  | import java.util.concurrent.atomic.AtomicInteger;
  | 
  | import javax.transaction.UserTransaction;
  | 
  | import org.apache.log4j.Logger;
  | import org.jboss.cache.Cache;
  | import org.jboss.cache.DefaultCacheFactory;
  | import org.jboss.cache.Fqn;
  | import org.jboss.cache.transaction.DummyTransactionManager;
  | import org.jboss.cache.transaction.DummyUserTransaction;
  | 
  | public class CacheThread extends Thread {
  |     private static final Logger LOG = Logger.getLogger("test");
  |     private static final int THREADS = 2;
  |     private static final int REPEAT = 20;
  |     private static final int SLEEP_TIME = 10;
  |     
  |     protected int threadId;
  |     protected final Cache<Object, Object> cache;
  |     
  |     public CacheThread(final int threadId) {
  |             this.threadId = threadId;
  |             cache = 
DefaultCacheFactory.getInstance().createCache("replSync-service.xml", true);
  |     }
  |     
  |     private boolean doTransaction() {
  |             final UserTransaction tx = new 
DummyUserTransaction(DummyTransactionManager.getInstance());
  |             try {
  |                     tx.begin();
  |                     cache.put(new Fqn<Object>("node"), "key", "value" + 
threadId);
  |                     tx.commit();
  |                 return true;
  |             } catch (Exception e) {
  |                     LOG.error("transaction failed", e);
  |                     try {
  |                             tx.rollback();
  |                     } catch (Exception e1) {
  |                             LOG.warn("rollback failed", e1);
  |                     }
  |                     return false;
  |             }
  |     }
  |     
  |     @Override
  |     public void run() {
  |             int failed = 0;
  |             for (int i = 0; i < REPEAT; ++i) {
  |                     try {
  |                             Thread.sleep(SLEEP_TIME);
  |                     } catch (Exception e) {
  |                             e.printStackTrace();
  |                     }
  |                     if (!doTransaction()) {
  |                             failed++;
  |                     }
  |             }
  |             System.out.println("Thread" + threadId + ": " + failed + " 
failures");
  |     }
  |     
  |     private void close() {
  |             cache.stop();
  |     }
  |     
  |     public static void main(String[] args) throws InterruptedException {
  |             final CacheThread[] threads = new CacheThread[THREADS];
  |             for (int t = 0; t < THREADS; t++) {
  |                     threads[t] = new CacheThread(t);
  |             }
  |             for (int t = 0; t < THREADS; t++) {
  |                     threads[t].start();
  |             }
  |             System.out.println("started");
  |             for (int t = 0; t < THREADS; t++) {
  |                     threads[t].join();
  |             }
  |             for (int t = 0; t < THREADS; t++) {
  |                     threads[t].close();
  |             }
  |     }
  | }
  | 

and the cache configuration:


  | <server>
  |     <mbean code="org.jboss.cache.jmx.CacheJmxWrapper"
  |             name="jboss.cache:service=TreeCache">
  | 
  |         <depends>jboss:service=Naming</depends>
  |         <depends>jboss:service=TransactionManager</depends>
  | 
  |       <attribute 
name="TransactionManagerLookupClass">org.jboss.cache.transaction.DummyTransactionManagerLookup
  |       </attribute>
  |       <!--
  |                 Node locking scheme:
  |             OPTIMISTIC
  |             PESSIMISTIC (default)
  |       -->
  |       <attribute name="NodeLockingScheme">OPTIMISTIC</attribute>
  |         <!--
  |             Isolation level : SERIALIZABLE
  |                               REPEATABLE_READ (default)
  |                               READ_COMMITTED
  |                               READ_UNCOMMITTED
  |                               NONE
  |         -->
  |         <attribute name="IsolationLevel">READ_COMMITTED</attribute>
  | 
  |         <attribute name="CacheMode">REPL_SYNC</attribute>
  | 
  |         <attribute name="ClusterName">Test cache</attribute>
  | 
  |         <attribute name="ClusterConfig">
  |                     <config>
  |                             <UDP bind_addr="10.0.0.226"
  |                                 mcast_addr="228.10.10.10"
  |                                     mcast_port="45588"
  |                                     tos="8"
  |                                     ucast_recv_buf_size="20000000"
  |                                     ucast_send_buf_size="640000"
  |                                     mcast_recv_buf_size="25000000"
  |                                     mcast_send_buf_size="640000"
  |                                     loopback="false"
  |                                     discard_incompatible_packets="true"
  |                                     max_bundle_size="64000"
  |                                     max_bundle_timeout="30"
  |                                     use_incoming_packet_handler="true"
  |                                     ip_ttl="2"
  |                                     enable_bundling="false"
  |                                     enable_diagnostics="true"
  |                                     use_concurrent_stack="true"
  |                                     thread_naming_pattern="pl"
  |                                     thread_pool.enabled="true"
  |                                     thread_pool.min_threads="1"
  |                                     thread_pool.max_threads="25"
  |                                     thread_pool.keep_alive_time="30000"
  |                                     thread_pool.queue_enabled="true"
  |                                     thread_pool.queue_max_size="10"
  |                                     thread_pool.rejection_policy="Run"
  |                                     oob_thread_pool.enabled="true"
  |                                     oob_thread_pool.min_threads="1"
  |                                     oob_thread_pool.max_threads="4"
  |                                     oob_thread_pool.keep_alive_time="10000"
  |                                     oob_thread_pool.queue_enabled="true"
  |                                     oob_thread_pool.queue_max_size="10"
  |                                     oob_thread_pool.rejection_policy="Run"/>
  |                             <PING timeout="2000" num_initial_members="3"/>
  |                             <MERGE2 max_interval="30000" 
min_interval="10000"/>
  |                             <FD_SOCK/>
  |                             <FD timeout="10000" max_tries="5" shun="true"/>
  |                             <VERIFY_SUSPECT timeout="1500"/>
  |                             <pbcast.NAKACK max_xmit_size="60000"
  |                                     use_mcast_xmit="false" gc_lag="0"
  |                                     
retransmit_timeout="300,600,1200,2400,4800"
  |                                     discard_delivered_msgs="true"/>
  |                             <UNICAST timeout="300,600,1200,2400,3600"/>
  |                             <pbcast.STABLE stability_delay="1000" 
desired_avg_gossip="50000"
  |                                     max_bytes="400000"/>
  |                             <pbcast.GMS print_local_addr="true" 
join_timeout="5000"
  |                                     join_retry_timeout="2000" shun="false"
  |                                     view_bundling="true" 
view_ack_collection_timeout="5000"/>
  |                             <FRAG2 frag_size="60000"/>
  |                             <pbcast.STREAMING_STATE_TRANSFER 
use_reading_thread="true"/>
  |                             <!-- <pbcast.STATE_TRANSFER/> -->
  |                             <pbcast.FLUSH timeout="0"/>
  |                     </config>
  |             </attribute>
  | 
  |         <attribute name="FetchInMemoryState">true</attribute>
  |         <attribute name="StateRetrievalTimeout">15000</attribute>
  |         <attribute name="SyncReplTimeout">15000</attribute>
  |         <attribute name="LockAcquisitionTimeout">1000</attribute>
  |         
  |         <attribute name="UseRegionBasedMarshalling">true</attribute>
  |         <attribute name="TransactionTimeout">300</attribute>
  |     </mbean>
  | 
  | </server>
  | 

With both pessimistic and optimistic locking, there are failures every time.
Here are some example exceptions:

- optimistic locking:

  | org.jboss.cache.lock.TimeoutException: failure acquiring lock: fqn=/, 
caller=GlobalTransaction:<10.0.0.226:32932>:1, lock=write 
owner=GlobalTransaction:<10.0.0.226:32931>:2 ([EMAIL PROTECTED])
  |     at org.jboss.cache.lock.IdentityLock.acquire(IdentityLock.java:528)
  |     at 
org.jboss.cache.interceptors.OptimisticLockingInterceptor.lockNodes(OptimisticLockingInterceptor.java:119)
  |     at 
org.jboss.cache.interceptors.OptimisticLockingInterceptor.invoke(OptimisticLockingInterceptor.java:52)
  |     at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
  |     at 
org.jboss.cache.interceptors.OptimisticReplicationInterceptor.invoke(OptimisticReplicationInterceptor.java:73)
  |     at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
  |     at 
org.jboss.cache.interceptors.NotificationInterceptor.invoke(NotificationInterceptor.java:32)
  |     at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
  |     at 
org.jboss.cache.interceptors.TxInterceptor.handleOptimisticPrepare(TxInterceptor.java:374)
  |     at 
org.jboss.cache.interceptors.TxInterceptor.handleRemotePrepare(TxInterceptor.java:250)
  |     at 
org.jboss.cache.interceptors.TxInterceptor.invoke(TxInterceptor.java:100)
  |     at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
  |     at 
org.jboss.cache.interceptors.CacheMgmtInterceptor.invoke(CacheMgmtInterceptor.java:123)
  |     at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
  |     at 
org.jboss.cache.interceptors.InvocationContextInterceptor.invoke(InvocationContextInterceptor.java:62)
  |     at org.jboss.cache.CacheImpl.invokeMethod(CacheImpl.java:3939)
  |     at org.jboss.cache.CacheImpl._replicate(CacheImpl.java:2853)
  |     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  |     at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
  |     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
  |     at java.lang.reflect.Method.invoke(Method.java:585)
  |     at org.jgroups.blocks.MethodCall.invoke(MethodCall.java:330)
  |     at 
org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher.handle(InactiveRegionAwareRpcDispatcher.java:77)
  |     at 
org.jgroups.blocks.RequestCorrelator.handleRequest(RequestCorrelator.java:624)
  |     at 
org.jgroups.blocks.RequestCorrelator.receiveMessage(RequestCorrelator.java:533)
  |     at 
org.jgroups.blocks.RequestCorrelator.receive(RequestCorrelator.java:365)
  |     at 
org.jgroups.blocks.MessageDispatcher$ProtocolAdapter.up(MessageDispatcher.java:736)
  |     at org.jgroups.JChannel.up(JChannel.java:1063)
  |     at org.jgroups.stack.ProtocolStack.up(ProtocolStack.java:325)
  |     at org.jgroups.protocols.pbcast.FLUSH.up(FLUSH.java:406)
  |     at 
org.jgroups.protocols.pbcast.STREAMING_STATE_TRANSFER.up(STREAMING_STATE_TRANSFER.java:255)
  |     at org.jgroups.protocols.FRAG2.up(FRAG2.java:197)
  |     at org.jgroups.protocols.pbcast.GMS.up(GMS.java:722)
  |     at org.jgroups.protocols.pbcast.STABLE.up(STABLE.java:234)
  |     at org.jgroups.protocols.UNICAST.up(UNICAST.java:263)
  |     at org.jgroups.protocols.pbcast.NAKACK.handleMessage(NAKACK.java:720)
  |     at org.jgroups.protocols.pbcast.NAKACK.up(NAKACK.java:546)
  |     at org.jgroups.protocols.VERIFY_SUSPECT.up(VERIFY_SUSPECT.java:167)
  |     at org.jgroups.protocols.FD.up(FD.java:322)
  |     at org.jgroups.protocols.FD_SOCK.up(FD_SOCK.java:298)
  |     at org.jgroups.protocols.MERGE2.up(MERGE2.java:145)
  |     at org.jgroups.protocols.Discovery.up(Discovery.java:220)
  |     at org.jgroups.protocols.TP$IncomingPacket.handleMyMessage(TP.java:1486)
  |     at org.jgroups.protocols.TP$IncomingPacket.run(TP.java:1440)
  |     at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
  |     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
  |     at java.lang.Thread.run(Thread.java:595)
  | Caused by: org.jboss.cache.lock.TimeoutException: write lock for / could 
not be acquired after 1000 ms. Locks: Read lock owners: []
  | Write lock owner: GlobalTransaction:<10.0.0.226:32931>:2
  |  (caller=GlobalTransaction:<10.0.0.226:32932>:1, lock info: write 
owner=GlobalTransaction:<10.0.0.226:32931>:2 ([EMAIL PROTECTED]))
  |     at 
org.jboss.cache.lock.IdentityLock.acquireWriteLock0(IdentityLock.java:244)
  |     at 
org.jboss.cache.lock.IdentityLock.acquireWriteLock(IdentityLock.java:167)
  |     at org.jboss.cache.lock.IdentityLock.acquire(IdentityLock.java:497)
  |     ... 46 more
  | 
  | javax.transaction.RollbackException: outcome is false status: 1
  |     at 
org.jboss.cache.transaction.DummyTransaction.commit(DummyTransaction.java:75)
  |     at 
org.jboss.cache.transaction.DummyBaseTransactionManager.commit(DummyBaseTransactionManager.java:78)
  |     at 
org.jboss.cache.transaction.DummyUserTransaction.commit(DummyUserTransaction.java:77)
  |     at CacheRepl.CacheThread.doTransaction(CacheThread.java:31)
  |     at CacheRepl.CacheThread.run(CacheThread.java:53)
  | 

- pessimistic locking:

  | org.jboss.cache.lock.TimeoutException: failure acquiring lock: fqn=/node, 
caller=GlobalTransaction:<10.0.0.226:32933>:2, lock=write 
owner=GlobalTransaction:<10.0.0.226:32934>:1 ([EMAIL PROTECTED])
  |     at org.jboss.cache.lock.IdentityLock.acquire(IdentityLock.java:528)
  |     at 
org.jboss.cache.interceptors.PessimisticLockInterceptor$LockManager.acquire(PessimisticLockInterceptor.java:579)
  |     at 
org.jboss.cache.interceptors.PessimisticLockInterceptor.acquireNodeLock(PessimisticLockInterceptor.java:393)
  |     at 
org.jboss.cache.interceptors.PessimisticLockInterceptor.lock(PessimisticLockInterceptor.java:329)
  |     at 
org.jboss.cache.interceptors.PessimisticLockInterceptor.invoke(PessimisticLockInterceptor.java:187)
  |     at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
  |     at 
org.jboss.cache.interceptors.ReplicationInterceptor.invoke(ReplicationInterceptor.java:34)
  |     at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
  |     at 
org.jboss.cache.interceptors.NotificationInterceptor.invoke(NotificationInterceptor.java:32)
  |     at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
  |     at 
org.jboss.cache.interceptors.TxInterceptor.replayModifications(TxInterceptor.java:511)
  |     at 
org.jboss.cache.interceptors.TxInterceptor.handlePessimisticPrepare(TxInterceptor.java:394)
  |     at 
org.jboss.cache.interceptors.TxInterceptor.handleRemotePrepare(TxInterceptor.java:254)
  |     at 
org.jboss.cache.interceptors.TxInterceptor.invoke(TxInterceptor.java:100)
  |     at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
  |     at 
org.jboss.cache.interceptors.CacheMgmtInterceptor.invoke(CacheMgmtInterceptor.java:123)
  |     at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
  |     at 
org.jboss.cache.interceptors.InvocationContextInterceptor.invoke(InvocationContextInterceptor.java:62)
  |     at org.jboss.cache.CacheImpl.invokeMethod(CacheImpl.java:3939)
  |     at org.jboss.cache.CacheImpl._replicate(CacheImpl.java:2853)
  |     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  |     at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
  |     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
  |     at java.lang.reflect.Method.invoke(Method.java:585)
  |     at org.jgroups.blocks.MethodCall.invoke(MethodCall.java:330)
  |     at 
org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher.handle(InactiveRegionAwareRpcDispatcher.java:77)
  |     at 
org.jgroups.blocks.RequestCorrelator.handleRequest(RequestCorrelator.java:624)
  |     at 
org.jgroups.blocks.RequestCorrelator.receiveMessage(RequestCorrelator.java:533)
  |     at 
org.jgroups.blocks.RequestCorrelator.receive(RequestCorrelator.java:365)
  |     at 
org.jgroups.blocks.MessageDispatcher$ProtocolAdapter.up(MessageDispatcher.java:736)
  |     at org.jgroups.JChannel.up(JChannel.java:1063)
  |     at org.jgroups.stack.ProtocolStack.up(ProtocolStack.java:325)
  |     at org.jgroups.protocols.pbcast.FLUSH.up(FLUSH.java:406)
  |     at 
org.jgroups.protocols.pbcast.STREAMING_STATE_TRANSFER.up(STREAMING_STATE_TRANSFER.java:255)
  |     at org.jgroups.protocols.FRAG2.up(FRAG2.java:197)
  |     at org.jgroups.protocols.pbcast.GMS.up(GMS.java:722)
  |     at org.jgroups.protocols.pbcast.STABLE.up(STABLE.java:234)
  |     at org.jgroups.protocols.UNICAST.up(UNICAST.java:263)
  |     at org.jgroups.protocols.pbcast.NAKACK.handleMessage(NAKACK.java:720)
  |     at org.jgroups.protocols.pbcast.NAKACK.up(NAKACK.java:546)
  |     at org.jgroups.protocols.VERIFY_SUSPECT.up(VERIFY_SUSPECT.java:167)
  |     at org.jgroups.protocols.FD.up(FD.java:322)
  |     at org.jgroups.protocols.FD_SOCK.up(FD_SOCK.java:298)
  |     at org.jgroups.protocols.MERGE2.up(MERGE2.java:145)
  |     at org.jgroups.protocols.Discovery.up(Discovery.java:220)
  |     at org.jgroups.protocols.TP$IncomingPacket.handleMyMessage(TP.java:1486)
  |     at org.jgroups.protocols.TP$IncomingPacket.run(TP.java:1440)
  |     at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
  |     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
  |     at java.lang.Thread.run(Thread.java:595)
  | Caused by: org.jboss.cache.lock.TimeoutException: write lock for /node 
could not be acquired after 1000 ms. Locks: Read lock owners: []
  | Write lock owner: GlobalTransaction:<10.0.0.226:32934>:1
  |  (caller=GlobalTransaction:<10.0.0.226:32933>:2, lock info: write 
owner=GlobalTransaction:<10.0.0.226:32934>:1 ([EMAIL PROTECTED]))
  |     at 
org.jboss.cache.lock.IdentityLock.acquireWriteLock0(IdentityLock.java:244)
  |     at 
org.jboss.cache.lock.IdentityLock.acquireWriteLock(IdentityLock.java:167)
  |     at org.jboss.cache.lock.IdentityLock.acquire(IdentityLock.java:497)
  |     ... 49 more
  | 
  | javax.transaction.RollbackException: outcome is false status: 1
  |     at 
org.jboss.cache.transaction.DummyTransaction.commit(DummyTransaction.java:75)
  |     at 
org.jboss.cache.transaction.DummyBaseTransactionManager.commit(DummyBaseTransactionManager.java:78)
  |     at 
org.jboss.cache.transaction.DummyUserTransaction.commit(DummyUserTransaction.java:77)
  |     at CacheRepl.CacheThread.doTransaction(CacheThread.java:31)
  |     at CacheRepl.CacheThread.run(CacheThread.java:53)
  | 

I don't care about the configuration details, but this test program should 
NEVER, EVER, NOT EVER, fail under any circumstances, except nuclear attack or 
something like that.
One cache should always get the lock first, write to the node, finish the 
transaction, then the other cache can do the same.
Just like 2 threads setting the same AtomicInteger - that NEVER throws an 
exception.
Is there any solution to this?

Thanks
Adrian

View the original post : 
http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4117891#4117891

Reply to the post : 
http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4117891
_______________________________________________
jboss-user mailing list
[email protected]
https://lists.jboss.org/mailman/listinfo/jboss-user

Reply via email to