[ 
https://issues.apache.org/jira/browse/GEODE-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eugene Nedzvetsky updated GEODE-5486:
-------------------------------------
    Attachment: TXJReadConflictTest.java

> Two transactions with only read operations produce CommitConflictException if 
> property gemfire.detectReadConflict=true
> ----------------------------------------------------------------------------------------------------------------------
>
>                 Key: GEODE-5486
>                 URL: https://issues.apache.org/jira/browse/GEODE-5486
>             Project: Geode
>          Issue Type: Bug
>          Components: transactions
>            Reporter: Eugene Nedzvetsky
>            Priority: Major
>         Attachments: TXJReadConflictTest.java
>
>
> Application was started with system property gemfire.detectReadConflict = 
> true.
> Some transactions with only read operations(region#get, ....)were broken 
> after that with an exception CommitConflictException:
> {code}
> java.util.concurrent.ExecutionException: 
> org.apache.geode.cache.CommitConflictException: The key  key1  in region  
> /TXJReadConflictTest  was being modified by another transaction locally.
>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>     at 
> org.apache.geode.TXJReadConflictTest.testDetectReadConflict(TXJReadConflictTest.java:131)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>     at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>     at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>     at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>     at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>     at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>     at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>     at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>     at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>     at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>     at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>     at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: org.apache.geode.cache.CommitConflictException: The key  key1  in 
> region  /TXJReadConflictTest  was being modified by another transaction 
> locally.
>     at 
> org.apache.geode.internal.cache.TXReservationMgr.checkSetForConflict(TXReservationMgr.java:107)
>     at 
> org.apache.geode.internal.cache.TXReservationMgr.checkForConflict(TXReservationMgr.java:77)
>     at 
> org.apache.geode.internal.cache.TXReservationMgr.makeReservation(TXReservationMgr.java:56)
>     at 
> org.apache.geode.internal.cache.TXLockRequest.txLocalLock(TXLockRequest.java:151)
>     at 
> org.apache.geode.internal.cache.TXLockRequest.obtain(TXLockRequest.java:84)
>     at 
> org.apache.geode.internal.cache.TXState.reserveAndCheck(TXState.java:340)
>     at org.apache.geode.internal.cache.TXState.commit(TXState.java:411)
>     at 
> org.apache.geode.internal.cache.TXStateProxyImpl.commit(TXStateProxyImpl.java:210)
>     at 
> org.apache.geode.internal.cache.TXManagerImpl.commit(TXManagerImpl.java:413)
>     at 
> org.apache.geode.TXJReadConflictTest.lambda$testDetectReadConflict$1(TXJReadConflictTest.java:126)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> {code}
> I reproduced this issue and created Integration test:
> {code}
> package org.apache.geode;
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one or more 
> contributor license
>  * agreements. See the NOTICE file distributed with this work for additional 
> information regarding
>  * copyright ownership. The ASF licenses this file to You under the Apache 
> License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance with the 
> License. You may obtain a
>  * copy of the License at
>  *
>  * http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software 
> distributed under the License
>  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 
> ANY KIND, either express
>  * or implied. See the License for the specific language governing 
> permissions and limitations under
>  * the License.
>  */
> import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
> import static org.junit.Assert.fail;
> import java.util.Properties;
> import java.util.concurrent.ExecutionException;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
> import java.util.concurrent.Future;
> import java.util.concurrent.locks.StampedLock;
> import org.apache.geode.cache.AttributesFactory;
> import org.apache.geode.cache.Cache;
> import org.apache.geode.cache.CacheException;
> import org.apache.geode.cache.CacheFactory;
> import org.apache.geode.cache.CacheTransactionManager;
> import org.apache.geode.cache.Region;
> import org.apache.geode.cache.Scope;
> import org.apache.geode.distributed.DistributedSystem;
> import org.apache.geode.internal.cache.GemFireCacheImpl;
> import org.apache.geode.internal.cache.TXManagerImpl;
> import org.junit.After;
> import org.junit.Before;
> import org.junit.Test;
> /**
>  * Tests basic detectReadConflict mode transaction functionality
>  */
> public class TXJReadConflictTest {
>     private CacheTransactionManager txMgr;
>     protected GemFireCacheImpl cache;
>     protected Region<String, String> region;
>     @Before
>     public void setUpTXJUnitTest() throws Exception {
>         System.setProperty("gemfire.detectReadConflicts", "true");
>         createCache();
>     }
>     @After
>     public void tearDownTXJUnitTest() {
>         closeCache();
>     }
>     private void closeCache() {
>         if (this.cache != null) {
>             if (this.txMgr != null) {
>                 try {
>                     this.txMgr.rollback();
>                 } catch (IllegalStateException ignore) {
>                 }
>             }
>             this.region = null;
>             this.txMgr = null;
>             Cache c = this.cache;
>             this.cache = null;
>             c.close();
>         }
>     }
>     private void createCache() {
>         Properties properties = new Properties();
>         properties.setProperty(MCAST_PORT, "0"); // loner
>         this.cache = (GemFireCacheImpl) 
> CacheFactory.create(DistributedSystem.connect(properties));
>         createRegion();
>         this.txMgr = this.cache.getCacheTransactionManager();
>     }
>     private void createRegion() {
>         AttributesFactory<String, String> attributesFactory = new 
> AttributesFactory<>();
>         attributesFactory.setScope(Scope.DISTRIBUTED_NO_ACK);
>         attributesFactory.setConcurrencyChecksEnabled(false); // test 
> validation expects this behavior
>         attributesFactory.setIndexMaintenanceSynchronous(true);
>         this.region = this.cache.createRegion(getClass().getSimpleName(), 
> attributesFactory.create());
>     }
>     /**
>      * Test that two transactions with only read operations don't produce 
> CommitConflictException
>      * This test fill some initial value on startup and after that create 
> different threads with region#get operations
>      * Sync two different threads commit time through lock
>      */
>     @Test(/* no exception expected */)
>     public void testDetectReadConflict() throws CacheException, 
> ExecutionException, InterruptedException {
>         final TXManagerImpl txMgrImpl = (TXManagerImpl) this.txMgr;
>         //fill some initial key-value on start
>         txMgrImpl.begin();
>         this.region.put("key1", "value1"); // non-tx
>         txMgrImpl.commit();
>         ExecutorService executor = Executors.newFixedThreadPool(2);
>         for (int i = 0; i < 20; i++) {
>             final StampedLock lock = new StampedLock();
>             lock.asWriteLock().lock();
>             //
>             Future<?> future1 = executor.submit(() -> {
>                 CacheTransactionManager txMgr2 = 
> cache.getCacheTransactionManager();
>                 txMgr2.begin();
>                 region.get("key1"); // non-tx
>                 lock.asReadLock().lock();
>                 txMgr2.commit();
>             });
>             Future<?> future2 = executor.submit(() -> {
>                 CacheTransactionManager txMgr3 = 
> cache.getCacheTransactionManager();
>                 txMgr3.begin();
>                 region.get("key1"); // non-tx
>                 lock.asReadLock().lock();
>                 txMgr3.commit();
>             });
>             pause(100);
>             lock.asWriteLock().unlock();
>             future1.get();
>             future2.get();
>         }
>     }
>     private void pause(int msWait) {
>         try {
>             Thread.sleep(msWait);
>         } catch (InterruptedException ignore) {
>             fail("interrupted");
>         }
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to