OK, I wired up DualNodeJtaTransactionManagerImpl. The test now passes if UserCount is below 4 or less, at 5 or above the test fails. However, the failure I am seeing with this test is NOT the same as with the test in our environment. It is this:
anonymous wrote : ------------------------------------------------------------------------------- | Test set: org.hibernate.test.cache.jbc2.functional.MVCCConcurrentWriteTest | ------------------------------------------------------------------------------- | Tests run: 6, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 182.113 sec <<< FAILURE! | testManyUsers(org.hibernate.test.cache.jbc2.functional.MVCCConcurrentWriteTest) Time elapsed: 165.831 sec <<< FAILURE! | junit.framework.AssertionFailedError: Timed out waiting for user threads to finish. Their state at the time of forced shutdown: TEST CONFIG [userCount=5, iterationsPerUser=40, thinkTimeMillis=10] STATE of UserRunners: org.hibernate.test.cache.jbc2.functional.mvccconcurrentwritetest$userrun...@ffc3eb[customerid=4 iterationsCompleted=30 completedAll=false causeOfFailure=org.hibernate.cache.CacheException: org.hibernate.cache.CacheException: org.jboss.cache.lock.TimeoutException: Unable to acquire lock on Fqn [/TS/test/org/hibernate/cache/UpdateTimestampsCache/Contacts] after [15000] milliseconds for requestor [Thread[UserRunnerThread-4,5,main]]! Lock held by [GlobalTransaction::903] | at org.hibernate.cache.jbc2.timestamp.TimestampsRegionImpl.put(TimestampsRegionImpl.java:130) | at org.hibernate.cache.UpdateTimestampsCache.preinvalidate(UpdateTimestampsCache.java:70) | at org.hibernate.engine.ActionQueue.execute(ActionQueue.java:275) | at org.hibernate.engine.ActionQueue.executeActions(ActionQueue.java:263) | at org.hibernate.engine.ActionQueue.executeActions(ActionQueue.java:167) | at org.hibernate.event.def.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:321) | at org.hibernate.event.def.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:50) | at org.hibernate.impl.SessionImpl.flush(SessionImpl.java:1027) | at org.hibernate.impl.SessionImpl.managedFlush(SessionImpl.java:365) | at org.hibernate.transaction.CacheSynchronization.beforeCompletion(CacheSynchronization.java:88) | at org.hibernate.test.cache.jbc2.functional.util.DualNodeJtaTransactionImpl.commit(DualNodeJtaTransactionImpl.java:76) | at org.hibernate.test.cache.jbc2.functional.util.DualNodeJtaTransactionManagerImpl.commit(DualNodeJtaTransactionManagerImpl.java:123) | at org.hibernate.test.cache.jbc2.functional.MVCCConcurrentWriteTest.commitTx(MVCCConcurrentWriteTest.java:255) | at org.hibernate.test.cache.jbc2.functional.MVCCConcurrentWriteTest.addContact(MVCCConcurrentWriteTest.java:333) | at org.hibernate.test.cache.jbc2.functional.MVCCConcurrentWriteTest.access$100(MVCCConcurrentWriteTest.java:29) | at org.hibernate.test.cache.jbc2.functional.MVCCConcurrentWriteTest$UserRunner.run(MVCCConcurrentWriteTest.java:423) | at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) | at java.lang.Thread.run(Thread.java:619) | Caused by: org.hibernate.cache.CacheException: org.jboss.cache.lock.TimeoutException: Unable to acquire lock on Fqn [/TS/test/org/hibernate/cache/UpdateTimestampsCache/Contacts] after [15000] milliseconds for requestor [Thread[UserRunnerThread-4,5,main]]! Lock held by [GlobalTransaction::903] | at org.hibernate.cache.jbc2.util.CacheHelper.put(CacheHelper.java:214) | at org.hibernate.cache.jbc2.timestamp.TimestampsRegionImpl.put(TimestampsRegionImpl.java:128) | ... 18 more] | org.hibernate.test.cache.jbc2.functional.mvccconcurrentwritetest$userrun...@18bbb61[customerid=6 iterationsCompleted=28 completedAll=false causeOfFailure=] | org.hibernate.test.cache.jbc2.functional.mvccconcurrentwritetest$userrun...@1cab18[customerid=2 iterationsCompleted=28 completedAll=false causeOfFailure=] | org.hibernate.test.cache.jbc2.functional.mvccconcurrentwritetest$userrun...@9cd8db[customerid=3 iterationsCompleted=30 completedAll=false causeOfFailure=] | | To repro: -set userCount to 5 or more -run the test and wait patiently for up to 5 minutes (not sure why, as with < 5 User threads this test finished in 3 seconds). The fact that this test isn't using a real JTA tx manager is a problem IMO - as the above exception seems to do more with TX cleanup (though I'd be happy to be proven wrong) . Even if the above bug is worked around and the test starts passing, it won't prove there isn't a problem since a crucial piece of the stack is replaced with a simpler impl. Here's the updated source: package org.hibernate.test.cache.jbc2.functional; | | import java.util.HashSet; | import java.util.Random; | import java.util.Set; | import java.util.concurrent.ExecutorService; | import java.util.concurrent.Executors; | import java.util.concurrent.TimeUnit; | import javax.transaction.SystemException; | import junit.framework.Test; | import org.hibernate.FlushMode; | import org.hibernate.Session; | import org.hibernate.cfg.Configuration; | import org.hibernate.exception.ExceptionUtils; | import org.hibernate.junit.functional.FunctionalTestClassTestSuite; | import org.hibernate.stat.SecondLevelCacheStatistics; | import org.hibernate.test.cache.jbc2.functional.util.DualNodeConnectionProviderImpl; | import org.hibernate.test.cache.jbc2.functional.util.DualNodeJtaTransactionManagerImpl; | import org.hibernate.test.cache.jbc2.functional.util.DualNodeTestUtil; | import org.hibernate.test.cache.jbc2.functional.util.DualNodeTransactionManagerLookup; | import org.hibernate.transaction.CMTTransactionFactory; | import org.slf4j.Logger; | import org.slf4j.LoggerFactory; | | /** | * | * @author [email protected] | */ | public class MVCCConcurrentWriteTest extends MVCCJBossCacheTest { | | private static final Logger LOG = LoggerFactory.getLogger(MVCCConcurrentWriteTest.class); | /** | * when USER_COUNT==1, tests pass, when >4 tests fail | */ | final private int USER_COUNT = 5; | final private int ITERATION_COUNT = 40; | final private int THINK_TIME_MILLIS = 10; | final private long LAUNCH_INTERVAL_MILLIS = 10; | final private Random random = new Random(); | /** | * kill switch used to stop all users when one fails | */ | private static boolean TERMINATE_ALL_USERS = false; | /** | * collection of IDs of all customers participating in this test | */ | private Set<Integer> customerIDs = new HashSet<Integer>(); | | public MVCCConcurrentWriteTest(String x) { | super(x); | } | | /** | * test that DB can be queried | * @throws java.lang.Exception | */ | public void testPingDb() throws Exception { | try { | beginTx(); | getEnvironment().getSessionFactory().getCurrentSession().createQuery("from " + Customer.class.getName()).list(); | commitTx(); | } catch (Exception e) { | rollbackTx(); | fail("failed to query DB; exception=" + e); | } | } | | @Override | protected void prepareTest() throws Exception { | super.prepareTest(); | } | | @Override | protected void cleanupTest() throws Exception { | try { | super.cleanupTest(); | | } finally { | cleanup(); | //DualNodeJtaTransactionManagerImpl.cleanupTransactions(); | //DualNodeJtaTransactionManagerImpl.cleanupTransactionManagers(); | } | } | | @Override | public void configure(Configuration cfg) { | super.configure(cfg); | cfg.setProperty(DualNodeTestUtil.NODE_ID_PROP, DualNodeTestUtil.LOCAL); | } | | @Override | protected Class getConnectionProviderClass() { | return DualNodeConnectionProviderImpl.class; | } | | @Override | protected Class getTransactionManagerLookupClass() { | return DualNodeTransactionManagerLookup.class; | } | | @Override | protected Class getTransactionFactoryClass() { | return CMTTransactionFactory.class; | } | | @Override | public void testEmptySecondLevelCacheEntry() throws Exception { | //do nothing | } | | @Override | public void testQueryCacheInvalidation() { | //do nothing | } | | @Override | public void testStaleWritesLeaveCacheConsistent() { | //do nothing | } | | public void testSingleUser() throws Exception { | //setup | Customer customer = createCustomer(0); | final Integer customerId = customer.getId(); | getCustomerIDs().add(customerId); | | assertNull("contact exists despite not being added", getFirstContact(customerId)); | | //check that cache was hit | SecondLevelCacheStatistics customerSlcs = getEnvironment().getSessionFactory().getStatistics().getSecondLevelCacheStatistics( | getPrefixedRegionName(Customer.class.getName())); | assertEquals(customerSlcs.getPutCount(), 1); | assertEquals(customerSlcs.getElementCountInMemory(), 1); | assertEquals(customerSlcs.getEntries().size(), 1); | | SecondLevelCacheStatistics contactsCollectionSlcs = getEnvironment().getSessionFactory().getStatistics().getSecondLevelCacheStatistics( | getPrefixedRegionName(Customer.class.getName() + ".contacts")); | assertEquals(1, contactsCollectionSlcs.getPutCount()); | assertEquals(1, contactsCollectionSlcs.getElementCountInMemory()); | assertEquals(1, contactsCollectionSlcs.getEntries().size()); | | final Contact contact = addContact(customerId); | assertNotNull("contact returned by addContact is null", contact); | assertEquals("Customer.contacts cache was not invalidated after addContact", | 0, contactsCollectionSlcs.getElementCountInMemory()); | | assertNotNull("Contact missing after successful add call", getFirstContact(customerId)); | | | //read everyone's contacts | readEveryonesFirstContact(); | | removeContact(customerId); | assertNull("contact still exists after successful remove call", getFirstContact(customerId)); | | } | | public void testManyUsers() throws Exception { | | //setup - create users | for (int i = 0; i < USER_COUNT; i++) { | Customer customer = createCustomer(0); | getCustomerIDs().add(customer.getId()); | } | | assertEquals("failed to create enough Customers", USER_COUNT, getCustomerIDs().size()); | | final ExecutorService pool = Executors.newFixedThreadPool(USER_COUNT); | | Set<UserRunner> runners = new HashSet<UserRunner>(); | for (Integer customerId : getCustomerIDs()) { | UserRunner r = new UserRunner(customerId); | runners.add(r); | pool.execute(r); | LOG.info("launched " + r); | Thread.sleep(LAUNCH_INTERVAL_MILLIS); //rampup | } | | assertEquals("not all user threads launched", USER_COUNT, runners.size()); | | pool.shutdown(); | boolean finishedInTime = pool.awaitTermination(120, TimeUnit.SECONDS); | | if (!finishedInTime) { //timed out waiting for users to finish | fail("Timed out waiting for user threads to finish. Their state at the time of forced shutdown: " + statusOfRunnersToString(runners)); | } else { | //if here -> pool finished before timing out | //check whether all runners suceeded | boolean success = true; | for (UserRunner r : runners) { | if (!r.isSuccess()) { | success = false; | break; | } | } | assertTrue("at least one UserRunner failed: " + statusOfRunnersToString(runners), success); | } | } | | public void cleanup() throws Exception { | | getCustomerIDs().clear(); | | String deleteContactHQL = "delete from Contact"; | String deleteCustomerHQL = "delete from Customer"; | | beginTx(); | try { | | //Session session = getSessions().getCurrentSession(); | Session session = getEnvironment().getSessionFactory().getCurrentSession(); | session.createQuery(deleteContactHQL).setFlushMode(FlushMode.AUTO).executeUpdate(); | session.createQuery(deleteCustomerHQL).setFlushMode(FlushMode.AUTO).executeUpdate(); | commitTx(); | } catch (Exception e) { | rollbackTx(); | throw e; | } | | } | | private Customer createCustomer(int nameSuffix) throws Exception { | beginTx(); | try { | Customer customer = new Customer(); | customer.setName("customer_" + nameSuffix); | customer.setContacts(new HashSet<Contact>()); | | getEnvironment().getSessionFactory().getCurrentSession().persist(customer); | commitTx(); | return customer; | } catch (Exception e) { | rollbackTx(); | throw e; | } | } | | /** | * delegate method since I'm trying to figure out which txManager to use | * given that this test runs multiple threads (SimpleJtaTxMgrImpl isn't suited for that). | * | * What is needed is a thread-safe JTATransactionManager impl that can handle concurrent TXs | * | * @throws java.lang.Exception | */ | private void beginTx() throws Exception { | DualNodeJtaTransactionManagerImpl.getInstance(DualNodeTestUtil.LOCAL).begin(); | } | | /** | * @see #beginTx() | * @throws java.lang.Exception | */ | private void commitTx() throws Exception { | DualNodeJtaTransactionManagerImpl.getInstance(DualNodeTestUtil.LOCAL).commit(); | } | | /** | * @see #beginTx() | * @throws java.lang.Exception | */ | private void rollbackTx() throws Exception { | DualNodeJtaTransactionManagerImpl.getInstance(DualNodeTestUtil.LOCAL).rollback(); | } | | /** | * read first contact of every Customer participating in this test. | * this forces concurrent cache writes of Customer.contacts Collection cache node | * | * @return who cares | * @throws java.lang.Exception | */ | private void readEveryonesFirstContact() throws Exception { | beginTx(); | try { | for (Integer customerId : getCustomerIDs()) { | final Customer customer = (Customer) getEnvironment().getSessionFactory().getCurrentSession().load(Customer.class, customerId); | Set<Contact> contacts = customer.getContacts(); | Contact firstContact = contacts.isEmpty() ? null : contacts.iterator().next(); | } | commitTx(); | } catch (Exception e) { | rollbackTx(); | throw e; | } | } | | /** | * -load existing Customer | * -get customer's contacts; return 1st one | * | * @param customerId | * @return first Contact or null if customer has none | */ | private Contact getFirstContact(Integer customerId) throws Exception { | assert customerId != null; | | beginTx(); | try { | final Customer customer = (Customer) getEnvironment().getSessionFactory().getCurrentSession().load(Customer.class, customerId); | Set<Contact> contacts = customer.getContacts(); | Contact firstContact = contacts.isEmpty() ? null : contacts.iterator().next(); | commitTx(); | return firstContact; | } catch (Exception e) { | rollbackTx(); | throw e; | } | } | | /** | * -load existing Customer | * -create a new Contact and add to customer's contacts | * | * @param customerId | * @return added Contact | */ | private Contact addContact(Integer customerId) throws Exception { | assert customerId != null; | | beginTx(); | try { | final Customer customer = (Customer) getEnvironment().getSessionFactory().getCurrentSession().load(Customer.class, customerId); | | Contact contact = new Contact(); | contact.setName("contact name"); | contact.setTlf("wtf is tlf?"); | | contact.setCustomer(customer); | customer.getContacts().add(contact); | | //assuming contact is persisted via cascade from customer | commitTx(); | | return contact; | } catch (Exception e) { | rollbackTx(); | throw e; | } | } | | /** | * remove existing 'contact' from customer's list of contacts | * | * @param contact contact to remove from customer's contacts | * @param customerId | * @throws IllegalStateException if customer does not own a contact | */ | private void removeContact(Integer customerId) throws Exception { | assert customerId != null; | | beginTx(); | try { | Customer customer = (Customer) getEnvironment().getSessionFactory().getCurrentSession().load(Customer.class, customerId); | Set<Contact> contacts = customer.getContacts(); | if (contacts.size() != 1) { | throw new IllegalStateException("can't remove contact: customer id=" + customerId + " expected exactly 1 contact, " + | "actual count=" + contacts.size()); | } | | Contact contact = contacts.iterator().next(); | contacts.remove(contact); | contact.setCustomer(null); | | //explicitly delete Contact because hbm has no 'DELETE_ORPHAN' cascade? | //getEnvironment().getSessionFactory().getCurrentSession().delete(contact); //appears to not be needed | | //assuming contact is persisted via cascade from customer | commitTx(); | } catch (Exception e) { | rollbackTx(); | throw e; | } | } | | /** | * @return the customerIDs | */ | public Set<Integer> getCustomerIDs() { | return customerIDs; | } | | private String statusOfRunnersToString(Set<UserRunner> runners) { | assert runners != null; | | StringBuilder sb = new StringBuilder("TEST CONFIG [userCount=" + USER_COUNT + | ", iterationsPerUser=" + ITERATION_COUNT + | ", thinkTimeMillis=" + THINK_TIME_MILLIS + "] " + | " STATE of UserRunners: "); | | for (UserRunner r : runners) { | sb.append(r.toString() + System.getProperty("line.separator")); | } | return sb.toString(); | } | | class UserRunner implements Runnable { | | final private Integer customerId; | private int completedIterations = 0; | private Throwable causeOfFailure; | | public UserRunner(final Integer cId) { | assert cId != null; | this.customerId = cId; | } | | private boolean contactExists() throws Exception { | return getFirstContact(customerId) != null; | } | | public void run() { | | //name this thread for easier log tracing | Thread.currentThread().setName("UserRunnerThread-" + getCustomerId()); | try { | for (int i = 0; i < ITERATION_COUNT; i++) { | | if (contactExists()) { | throw new IllegalStateException("contact already exists before add, customerId=" + customerId); | } | | addContact(customerId); | | if (!contactExists()) { | throw new IllegalStateException("contact missing after successful add, customerId=" + customerId); | } | | thinkRandomTime(); | | //read everyone's contacts | readEveryonesFirstContact(); | | thinkRandomTime(); | | removeContact(customerId); | | if (contactExists()) { | throw new IllegalStateException("contact still exists after successful remove call, customerId=" + customerId); | } | | thinkRandomTime(); | | ++completedIterations; | } | | } catch (Throwable t) { | | this.causeOfFailure = t; | TERMINATE_ALL_USERS = true; | | //rollback current transaction if any | //really should not happen since above methods all follow begin-commit-rollback pattern | try { | if (DualNodeJtaTransactionManagerImpl.getInstance(DualNodeTestUtil.LOCAL).getTransaction() != null) { | DualNodeJtaTransactionManagerImpl.getInstance(DualNodeTestUtil.LOCAL).rollback(); | } | } catch (SystemException ex) { | throw new RuntimeException("failed to rollback tx", ex); | } | } | } | | public boolean isSuccess() { | return ITERATION_COUNT == getCompletedIterations(); | } | | public int getCompletedIterations() { | return completedIterations; | } | | public Throwable getCauseOfFailure() { | return causeOfFailure; | } | | public Integer getCustomerId() { | return customerId; | } | | @Override | public String toString() { | return super.toString() + | "[customerId=" + getCustomerId() + | " iterationsCompleted=" + getCompletedIterations() + | " completedAll=" + isSuccess() + | " causeOfFailure=" + (this.causeOfFailure != null ? ExceptionUtils.getStackTrace(causeOfFailure) : "") + "] "; | } | } | | /** | * sleep between 0 and THINK_TIME_MILLIS. | * @throws RuntimeException if sleep is interruped or TERMINATE_ALL_USERS flag was set to true in the meantime | */ | private void thinkRandomTime() { | try { | Thread.sleep(random.nextInt(THINK_TIME_MILLIS)); | } catch (InterruptedException ex) { | throw new RuntimeException("sleep interrupted", ex); | } | | if (TERMINATE_ALL_USERS) { | throw new RuntimeException("told to terminate (because a UserRunner had failed)"); | } | } | | public static Test suite() { | return new FunctionalTestClassTestSuite(MVCCConcurrentWriteTest.class); | } | } | View the original post : http://www.jboss.org/index.html?module=bb&op=viewtopic&p=4215956#4215956 Reply to the post : http://www.jboss.org/index.html?module=bb&op=posting&mode=reply&p=4215956 _______________________________________________ jboss-user mailing list [email protected] https://lists.jboss.org/mailman/listinfo/jboss-user
