This is an automated email from the ASF dual-hosted git repository. eshu11 pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 81ae44d GEODE-4721: Fix a case that client proxy region returns empty set if … (#1532) 81ae44d is described below commit 81ae44d3d70283753168ba27ce76daad47824ba2 Author: pivotal-eshu <e...@pivotal.io> AuthorDate: Fri Mar 2 15:14:59 2018 -0800 GEODE-4721: Fix a case that client proxy region returns empty set if … (#1532) * GEODE-4721: Fix a case that client proxy region returns empty set if it is a first operation in the transaction. Add additional test cases on set operation with JTA. --- .../geode/internal/cache/TXStateProxyImpl.java | 13 +- .../cache/execute/PRSetOperationJTADUnitTest.java | 180 +++++++++++++ .../cache/tx/SetOperationJTADistributedTest.java | 290 +++++++++++++++++++++ .../internal/jta/ClientServerJTADUnitTest.java | 290 +++++++++++++++++++-- .../internal/jta/SetOperationJTAJUnitTest.java | 154 +++++++++++ 5 files changed, 906 insertions(+), 21 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java index 413c99c..1c4e922 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java @@ -577,11 +577,20 @@ public class TXStateProxyImpl implements TXStateProxy { } private boolean isTransactionInternalSuspendNeeded(LocalRegion region) { - boolean resetTxState = this.realDeal == null - && (!region.canStoreDataLocally() || restoreSetOperationTransactionBehavior); + // for peer accessor, do not bootstrap transaction in the node as subsequent operations + // will fail as transaction should be on data node only + boolean resetTxState = + this.realDeal == null && (isPeerAccessor(region) || restoreSetOperationTransactionBehavior); return resetTxState; } + private boolean isPeerAccessor(LocalRegion region) { + if (region.hasServerProxy()) { + return false; + } + return !region.canStoreDataLocally(); + } + @Override public Object getKeyForIterator(KeyInfo keyInfo, LocalRegion currRgn, boolean rememberReads, boolean allowTombstones) { diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRSetOperationJTADUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRSetOperationJTADUnitTest.java index 93875c5..6abbff2 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRSetOperationJTADUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRSetOperationJTADUnitTest.java @@ -14,6 +14,7 @@ */ package org.apache.geode.internal.cache.execute; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -38,7 +39,12 @@ import org.junit.runner.RunWith; import org.apache.geode.cache.PartitionAttributesFactory; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.execute.Function; +import org.apache.geode.cache.execute.FunctionContext; +import org.apache.geode.cache.execute.FunctionService; +import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.DistributionConfig; +import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.TXManagerImpl; import org.apache.geode.internal.cache.TXStateProxyImpl; import org.apache.geode.internal.logging.LogService; @@ -58,6 +64,7 @@ public class PRSetOperationJTADUnitTest extends JUnit4CacheTestCase { private static final String REGION_NAME = "region1"; private Map<Long, String> testData; + private Map<Long, String> modifiedData; private VM accessor = null; private VM dataStore1 = null; @@ -79,6 +86,9 @@ public class PRSetOperationJTADUnitTest extends JUnit4CacheTestCase { testData.put(2L, "value2"); testData.put(3L, "duplicateValue"); testData.put(4L, "duplicateValue"); + modifiedData = new HashMap<>(); + modifiedData.putAll(testData); + modifiedData.put(5L, "newValue"); } @Override @@ -169,6 +179,7 @@ public class PRSetOperationJTADUnitTest extends JUnit4CacheTestCase { userTX.begin(); Collection<Long> set = region.keySet(); set.forEach((key) -> assertTrue(testData.keySet().contains(key))); + testData.keySet().forEach((key) -> assertTrue(set.contains(key))); } finally { validateTXManager(disableSetOpToStartJTA, isAccessor); if (!disableSetOpToStartJTA && !isAccessor) { @@ -186,6 +197,7 @@ public class PRSetOperationJTADUnitTest extends JUnit4CacheTestCase { userTX.begin(); Collection<String> set = region.values(); set.forEach((value) -> assertTrue(testData.values().contains(value))); + testData.values().forEach((value) -> assertTrue(set.contains(value))); } finally { validateTXManager(disableSetOpToStartJTA, isAccessor); if (!disableSetOpToStartJTA && !isAccessor) { @@ -206,6 +218,7 @@ public class PRSetOperationJTADUnitTest extends JUnit4CacheTestCase { assertTrue(testData.values().contains(entry.getValue())); assertTrue(testData.keySet().contains(entry.getKey())); }); + testData.entrySet().forEach((entry) -> assertTrue(set.contains(entry))); } finally { validateTXManager(disableSetOpToStartJTA, isAccessor); if (!disableSetOpToStartJTA && !isAccessor) { @@ -253,4 +266,171 @@ public class PRSetOperationJTADUnitTest extends JUnit4CacheTestCase { .setLocalMaxMemory(isAccessor ? 0 : 1).create()) .create(REGION_NAME); } + + @Test + public void testRegionValuesWithPutWhenSetOperationStartsJTA() throws Exception { + boolean disableSetOpToStartJTA = false; + setupRegion(disableSetOpToStartJTA); + + accessor.invoke(() -> verifyRegionValuesWhenSetOperationStartsJTA()); + dataStore1.invoke(() -> verifyRegionValuesWhenSetOperationStartsJTA()); + } + + private void setupRegion(boolean disableSetOpToStartJTA) { + accessor.invoke(() -> createCache(disableSetOpToStartJTA)); + dataStore1.invoke(() -> createCache(disableSetOpToStartJTA)); + accessor.invoke(() -> createPR(true)); + dataStore1.invoke(() -> createPR(false)); + dataStore1.invoke(() -> loadRegion()); + } + + + private void verifyRegionValuesWhenSetOperationStartsJTA() throws Exception { + Context ctx = cache.getJNDIContext(); + UserTransaction userTX = startUserTransaction(ctx); + Region<Long, String> region = basicGetCache().getRegion(Region.SEPARATOR + REGION_NAME); + try { + userTX.begin(); + Collection<String> set = region.values(); + set.forEach((value) -> assertTrue(testData.values().contains(value))); + testData.values().forEach((value) -> assertTrue(set.contains(value))); + assertEquals(testData.size(), set.size()); + region.put(5L, "newValue"); + set.forEach((value) -> assertTrue(modifiedData.values().contains(value))); + modifiedData.values().forEach((value) -> assertTrue(set.contains(value))); + assertEquals(modifiedData.size(), set.size()); + } finally { + userTX.rollback(); + } + } + + @Test + public void testRegionValuesWithPutWhenSetOperationDoesNotStartJTA() throws Exception { + boolean disableSetOpToStartJTA = true; + setupRegion(disableSetOpToStartJTA); + + accessor.invoke(() -> verifyRegionValuesWhenSetOperationDoesNotStartJTA()); + dataStore1.invoke(() -> verifyRegionValuesWhenSetOperationDoesNotStartJTA()); + } + + + private void verifyRegionValuesWhenSetOperationDoesNotStartJTA() throws Exception { + Context ctx = cache.getJNDIContext(); + UserTransaction userTX = startUserTransaction(ctx); + Region<Long, String> region = basicGetCache().getRegion(Region.SEPARATOR + REGION_NAME); + try { + userTX.begin(); + Collection<String> set = region.values(); + set.forEach((value) -> assertTrue(testData.values().contains(value))); + testData.values().forEach((value) -> assertTrue(set.contains(value))); + assertEquals(testData.size(), set.size()); + region.put(5L, "newValue"); + assertThatThrownBy(() -> set.contains("newValue")).isInstanceOf(IllegalStateException.class) + .hasMessageContaining( + "The Region collection is not transactional but is being used in a transaction"); + } finally { + userTX.rollback(); + } + } + + + @Test + public void testTxFunctionOnMemberWhenSetOperationDoesNotStartJTA() { + doTestTxFunction(true); + } + + @Test + public void testTxFunctionOnMemberWhenSetOperationStartsJTA() { + doTestTxFunction(false); + } + + private void doTestTxFunction(boolean disableSetOpToStartJTA) { + setupAndLoadRegion(disableSetOpToStartJTA); + + accessor.invoke(() -> registerFunction()); + dataStore1.invoke(() -> registerFunction()); + dataStore2.invoke(() -> registerFunction()); + dataStore3.invoke(() -> registerFunction()); + + accessor.invoke(() -> doTxFunction(disableSetOpToStartJTA)); + dataStore1.invoke(() -> doTxFunction(disableSetOpToStartJTA)); + dataStore2.invoke(() -> doTxFunction(disableSetOpToStartJTA)); + dataStore3.invoke(() -> doTxFunction(disableSetOpToStartJTA)); + } + + private void registerFunction() { + FunctionService.registerFunction(new TXFunctionSetOpDoesNoStartJTA()); + FunctionService.registerFunction(new TXFunctionSetOpStartsJTA()); + } + + class TXFunctionSetOpStartsJTA implements Function { + static final String id = "TXFunctionSetOpStartsJTA"; + + public void execute(FunctionContext context) { + Region r = null; + try { + verifyRegionValuesWhenSetOperationStartsJTA(); + } catch (Exception e) { + throw new RuntimeException(e); + } + context.getResultSender().lastResult(Boolean.TRUE); + } + + public String getId() { + return id; + } + + public boolean hasResult() { + return true; + } + + public boolean optimizeForWrite() { + return true; + } + + public boolean isHA() { + return false; + } + } + + class TXFunctionSetOpDoesNoStartJTA implements Function { + static final String id = "TXFunctionSetOpDoesNotStartJTA"; + + public void execute(FunctionContext context) { + Region r = null; + try { + verifyRegionValuesWhenSetOperationDoesNotStartJTA(); + } catch (Exception e) { + throw new RuntimeException(e); + } + context.getResultSender().lastResult(Boolean.TRUE); + } + + public String getId() { + return id; + } + + public boolean hasResult() { + return true; + } + + public boolean optimizeForWrite() { + return true; + } + + public boolean isHA() { + return false; + } + } + + private void doTxFunction(boolean disableSetOpToStartJTA) { + PartitionedRegion region = + (PartitionedRegion) basicGetCache().getRegion(Region.SEPARATOR + REGION_NAME); + DistributedMember owner = region.getOwnerForKey(region.getKeyInfo(5L)); + if (disableSetOpToStartJTA) { + FunctionService.onMember(owner).execute(TXFunctionSetOpDoesNoStartJTA.id).getResult(); + } else { + FunctionService.onMember(owner).execute(TXFunctionSetOpStartsJTA.id).getResult(); + } + } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tx/SetOperationJTADistributedTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tx/SetOperationJTADistributedTest.java new file mode 100644 index 0000000..48ee0b4 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tx/SetOperationJTADistributedTest.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache.tx; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import javax.naming.Context; +import javax.transaction.UserTransaction; + +import org.apache.logging.log4j.Logger; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.execute.Function; +import org.apache.geode.cache.execute.FunctionContext; +import org.apache.geode.cache.execute.FunctionService; +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.distributed.internal.DistributionConfig; +import org.apache.geode.internal.logging.LogService; +import org.apache.geode.test.dunit.Host; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; +import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties; +import org.apache.geode.test.junit.categories.DistributedTest; + +@Category(DistributedTest.class) +public class SetOperationJTADistributedTest extends JUnit4CacheTestCase { + private static final Logger logger = LogService.getLogger(); + private static final String REGION_NAME = "region1"; + + private Map<Long, String> testData; + private Map<Long, String> modifiedData; + + private VM dataStore1 = null; + private VM dataStore2 = null; + private VM dataStore3 = null; + private VM dataStore4 = null; + + @Rule + public DistributedRestoreSystemProperties restoreSystemProperties = + new DistributedRestoreSystemProperties(); + + public SetOperationJTADistributedTest() { + super(); + } + + @Before + public void setup() { + testData = new HashMap<>(); + testData.put(1L, "value1"); + testData.put(2L, "value2"); + testData.put(3L, "duplicateValue"); + testData.put(4L, "duplicateValue"); + modifiedData = new HashMap<>(); + modifiedData.putAll(testData); + modifiedData.put(5L, "newValue"); + } + + @Override + public final void postSetUp() throws Exception { + disconnectAllFromDS(); // isolate this test from others to avoid periodic CacheExistsExceptions + Host host = Host.getHost(0); + dataStore1 = host.getVM(0); + dataStore2 = host.getVM(1); + dataStore3 = host.getVM(2); + dataStore4 = host.getVM(3); + } + + @Test + public void testRegionValuesWithPutWhenSetOperationStartsJTA() throws Exception { + setupAndLoadRegion(false); + dataStore1.invoke(() -> verifyRegionValuesWhenSetOperationStartsJTA()); + dataStore2.invoke(() -> verifyRegionValuesWhenSetOperationStartsJTA()); + dataStore3.invoke(() -> verifyRegionValuesWhenSetOperationStartsJTA()); + dataStore4.invoke(() -> verifyRegionValuesWhenSetOperationStartsJTA()); + } + + private void setupAndLoadRegion(boolean disableSetOpToStartJTA) { + createRegion(disableSetOpToStartJTA); + dataStore1.invoke(() -> loadRegion()); + } + + private void createRegion(boolean disableSetOpToStartJTA) { + dataStore1.invoke(() -> createCache(disableSetOpToStartJTA)); + dataStore2.invoke(() -> createCache(disableSetOpToStartJTA)); + dataStore3.invoke(() -> createCache(disableSetOpToStartJTA)); + dataStore4.invoke(() -> createCache(disableSetOpToStartJTA)); + + dataStore1.invoke(() -> createRegion()); + dataStore2.invoke(() -> createRegion()); + dataStore3.invoke(() -> createRegion()); + dataStore4.invoke(() -> createRegion()); + } + + final String restoreSetOperationTransactionBehavior = "restoreSetOperationTransactionBehavior"; + final String RESTORE_SET_OPERATION_PROPERTY = + (System.currentTimeMillis() % 2 == 0 ? DistributionConfig.GEMFIRE_PREFIX : "geode.") + + restoreSetOperationTransactionBehavior; + + private void createCache(boolean disableSetOpToStartJTA) { + if (disableSetOpToStartJTA) { + logger.info("setting system property {} to true ", RESTORE_SET_OPERATION_PROPERTY); + System.setProperty(RESTORE_SET_OPERATION_PROPERTY, "true"); + } + getCache(); + } + + private void createRegion() { + RegionFactory<Long, String> rf = basicGetCache().createRegionFactory(RegionShortcut.REPLICATE); + Region<Long, String> r = rf.create(REGION_NAME); + } + + private void loadRegion() { + Region<Long, String> region = basicGetCache().getRegion(Region.SEPARATOR + REGION_NAME); + testData.forEach((k, v) -> region.put(k, v)); + } + + private void verifyRegionValuesWhenSetOperationStartsJTA() throws Exception { + Context ctx = getCache().getJNDIContext(); + UserTransaction userTX = startUserTransaction(ctx); + Region<Long, String> region = getCache().getRegion(Region.SEPARATOR + REGION_NAME); + try { + userTX.begin(); + Collection<String> set = region.values(); + set.forEach((value) -> assertTrue(testData.values().contains(value))); + testData.values().forEach((value) -> assertTrue(set.contains(value))); + assertEquals(testData.size(), set.size()); + region.put(5L, "newValue"); + set.forEach((value) -> assertTrue(modifiedData.values().contains(value))); + modifiedData.values().forEach((value) -> assertTrue(set.contains(value))); + assertEquals(modifiedData.size(), set.size()); + } finally { + userTX.rollback(); + } + } + + private UserTransaction startUserTransaction(Context ctx) throws Exception { + return (UserTransaction) ctx.lookup("java:/UserTransaction"); + } + + @Test + public void testRegionValuesWithPutWhenSetOperationDoesNotStartJTA() throws Exception { + setupAndLoadRegion(true); + dataStore1.invoke(() -> verifyRegionValuesWhenSetOperationDoesNotStartJTA()); + dataStore2.invoke(() -> verifyRegionValuesWhenSetOperationDoesNotStartJTA()); + dataStore3.invoke(() -> verifyRegionValuesWhenSetOperationDoesNotStartJTA()); + dataStore4.invoke(() -> verifyRegionValuesWhenSetOperationDoesNotStartJTA()); + } + + private void verifyRegionValuesWhenSetOperationDoesNotStartJTA() throws Exception { + Context ctx = getCache().getJNDIContext(); + UserTransaction userTX = startUserTransaction(ctx); + Region<Long, String> region = getCache().getRegion(Region.SEPARATOR + REGION_NAME); + try { + userTX.begin(); + Collection<String> set = region.values(); + set.forEach((value) -> assertTrue(testData.values().contains(value))); + testData.values().forEach((value) -> assertTrue(set.contains(value))); + assertEquals(testData.size(), set.size()); + region.put(5L, "newValue"); + assertThatThrownBy(() -> set.contains("newValue")).isInstanceOf(IllegalStateException.class) + .hasMessageContaining( + "The Region collection is not transactional but is being used in a transaction"); + } finally { + userTX.rollback(); + } + } + + @Test + public void testTxFunctionOnMemberWhenSetOperationDoesNotStartJTA() { + doTestTxFunction(true); + } + + @Test + public void testTxFunctionOnMemberWhenSetOperationStartsJTA() { + doTestTxFunction(false); + } + + private void doTestTxFunction(boolean disableSetOpToStartJTA) { + setupAndLoadRegion(disableSetOpToStartJTA); + dataStore1.invoke(() -> registerFunction()); + dataStore2.invoke(() -> registerFunction()); + dataStore3.invoke(() -> registerFunction()); + dataStore4.invoke(() -> registerFunction()); + + dataStore1.invoke(() -> doTxFunction(disableSetOpToStartJTA)); + dataStore2.invoke(() -> doTxFunction(disableSetOpToStartJTA)); + dataStore3.invoke(() -> doTxFunction(disableSetOpToStartJTA)); + dataStore4.invoke(() -> doTxFunction(disableSetOpToStartJTA)); + } + + class TXFunctionSetOpStartsJTA implements Function { + static final String id = "TXFunctionSetOpStartsJTA"; + + public void execute(FunctionContext context) { + Region r = null; + try { + verifyRegionValuesWhenSetOperationStartsJTA(); + } catch (Exception e) { + throw new RuntimeException(e); + } + context.getResultSender().lastResult(Boolean.TRUE); + } + + public String getId() { + return id; + } + + public boolean hasResult() { + return true; + } + + public boolean optimizeForWrite() { + return true; + } + + public boolean isHA() { + return false; + } + } + + class TXFunctionSetOpDoesNoStartJTA implements Function { + static final String id = "TXFunctionSetOpDoesNotStartJTA"; + + public void execute(FunctionContext context) { + Region r = null; + try { + verifyRegionValuesWhenSetOperationDoesNotStartJTA(); + } catch (Exception e) { + throw new RuntimeException(e); + } + context.getResultSender().lastResult(Boolean.TRUE); + } + + public String getId() { + return id; + } + + public boolean hasResult() { + return true; + } + + public boolean optimizeForWrite() { + return true; + } + + public boolean isHA() { + return false; + } + } + + private void registerFunction() { + FunctionService.registerFunction(new TXFunctionSetOpDoesNoStartJTA()); + FunctionService.registerFunction(new TXFunctionSetOpStartsJTA()); + } + + private void doTxFunction(boolean disableSetOpToStartJTA) { + DistributedMember owner = getCache().getDistributedSystem().getDistributedMember(); + if (disableSetOpToStartJTA) { + FunctionService.onMember(owner).execute(TXFunctionSetOpDoesNoStartJTA.id).getResult(); + } else { + FunctionService.onMember(owner).execute(TXFunctionSetOpStartsJTA.id).getResult(); + } + } + +} diff --git a/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java index d739010..8fee060 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java @@ -14,17 +14,30 @@ */ package org.apache.geode.internal.jta; -import static org.junit.Assert.*; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; +import java.util.Collection; +import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import javax.naming.NamingException; +import javax.transaction.NotSupportedException; import javax.transaction.Status; +import javax.transaction.SystemException; import javax.transaction.TransactionManager; +import org.apache.logging.log4j.Logger; import org.awaitility.Awaitility; +import org.junit.After; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -36,6 +49,7 @@ import org.apache.geode.cache.client.ClientCache; import org.apache.geode.cache.client.ClientCacheFactory; import org.apache.geode.cache.client.ClientRegionShortcut; import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.internal.AvailablePortHelper; import org.apache.geode.internal.cache.TXCommitMessage; import org.apache.geode.internal.cache.TXId; @@ -48,36 +62,49 @@ import org.apache.geode.test.dunit.Assert; import org.apache.geode.test.dunit.Host; import org.apache.geode.test.dunit.VM; import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; +import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties; import org.apache.geode.test.junit.categories.DistributedTest; @Category({DistributedTest.class}) public class ClientServerJTADUnitTest extends JUnit4CacheTestCase { + private static final Logger logger = LogService.getLogger(); private String key = "key"; private String value = "value"; + private String newKey = "newKey"; private String newValue = "newValue"; + private final String REGION_NAME = "testRegion"; final Host host = Host.getHost(0); final VM server = host.getVM(0); final VM client = host.getVM(1); + + @Rule + public DistributedRestoreSystemProperties restoreSystemProperties = + new DistributedRestoreSystemProperties(); + + @After + public void tearDown() throws Exception { + closeCache(); + } + @Test public void testClientTXStateStubBeforeCompletion() throws Exception { - final String regionName = getUniqueName(); getBlackboard().initBlackboard(); final Properties properties = getDistributedSystemProperties(); - final int port = server.invoke(() -> createServerRegion(regionName, properties)); + final int port = server.invoke(() -> createServerRegion(REGION_NAME, properties)); - client.invoke(() -> createClientRegion(host, port, regionName)); + client.invoke(() -> createClientRegion(host, port, REGION_NAME, false, false)); - createClientRegion(host, port, regionName); + createClientRegion(host, port, REGION_NAME, false, false); - Region region = getCache().getRegion(regionName); + Region region = getCache().getRegion(REGION_NAME); assertTrue(region.get(key).equals(value)); String first = "one"; String second = "two"; - client.invokeAsync(() -> commitTxWithBeforeCompletion(regionName, true, first, second)); + client.invokeAsync(() -> commitTxWithBeforeCompletion(REGION_NAME, true, first, second)); getBlackboard().waitForGate(first, 30, TimeUnit.SECONDS); TXManagerImpl mgr = (TXManagerImpl) getCache().getCacheTransactionManager(); @@ -99,7 +126,7 @@ public class ClientServerJTADUnitTest extends JUnit4CacheTestCase { // the region could have the new value but still hold the locks. // Add the wait to check new JTA tx can be committed. Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS).pollDelay(10, TimeUnit.MILLISECONDS) - .atMost(30, TimeUnit.SECONDS).until(() -> ableToCommitNewTx(regionName, mgr)); + .atMost(30, TimeUnit.SECONDS).until(() -> ableToCommitNewTx(REGION_NAME, mgr)); } private boolean expectionLogged = false; @@ -130,11 +157,20 @@ public class ClientServerJTADUnitTest extends JUnit4CacheTestCase { return server; } - private void createClientRegion(final Host host, final int port0, String regionName) { + private void createClientRegion(final Host host, final int port0, String regionName, + boolean disableSetOpToStartJTA, boolean isCacheProxy) { ClientCacheFactory cf = new ClientCacheFactory(); cf.addPoolServer(host.getHostName(), port0); + if (disableSetOpToStartJTA) { + logger.info("setting system property {} to true ", RESTORE_SET_OPERATION_PROPERTY); + System.setProperty(RESTORE_SET_OPERATION_PROPERTY, "true"); + } ClientCache cache = getClientCache(cf); - cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName); + if (isCacheProxy) { + cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regionName); + } else { + cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName); + } } private void commitTxWithBeforeCompletion(String regionName, boolean withWait, String first, @@ -165,26 +201,25 @@ public class ClientServerJTADUnitTest extends JUnit4CacheTestCase { } private void testJTAWithMaxThreads(int maxThreads) { - final String regionName = getUniqueName(); getBlackboard().initBlackboard(); final Properties properties = getDistributedSystemProperties(); final int port = server.invoke("create cache", () -> { Cache cache = getCache(properties); CacheServer cacheServer = createCacheServer(cache, maxThreads); - Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName); + Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(REGION_NAME); region.put(key, value); return cacheServer.getPort(); }); - createClientRegion(host, port, regionName); + createClientRegion(host, port, REGION_NAME, false, false); - Region region = getCache().getRegion(regionName); + Region region = getCache().getRegion(REGION_NAME); assertTrue(region.get(key).equals(value)); try { - commitTxWithBeforeCompletion(regionName, false, null, null); + commitTxWithBeforeCompletion(REGION_NAME, false, null, null); } catch (Exception e) { Assert.fail("got unexpected exception", e); } @@ -193,14 +228,13 @@ public class ClientServerJTADUnitTest extends JUnit4CacheTestCase { @Test public void testClientCompletedJTAIsInFailoverMap() throws Exception { - final String regionName = getUniqueName(); final Properties properties = getDistributedSystemProperties(); - final int port = server.invoke(() -> createServerRegion(regionName, properties)); + final int port = server.invoke(() -> createServerRegion(REGION_NAME, properties)); - createClientRegion(host, port, regionName); + createClientRegion(host, port, REGION_NAME, false, false); - Region region = getCache().getRegion(regionName); + Region region = getCache().getRegion(REGION_NAME); assertTrue(region.get(key).equals(value)); TransactionManager JTAManager = @@ -254,4 +288,222 @@ public class ClientServerJTADUnitTest extends JUnit4CacheTestCase { } + final String restoreSetOperationTransactionBehavior = "restoreSetOperationTransactionBehavior"; + final String RESTORE_SET_OPERATION_PROPERTY = + (System.currentTimeMillis() % 2 == 0 ? DistributionConfig.GEMFIRE_PREFIX : "geode.") + + restoreSetOperationTransactionBehavior; + + @Test + public void testValuesOnProxyWithPutWhenSetOperationDoesNotStartJTA() throws Exception { + final Properties properties = getDistributedSystemProperties(); + final int port = server.invoke(() -> createServerRegion(REGION_NAME, properties)); + createClientRegion(host, port, REGION_NAME, true, false); + + Region region = getCache().getRegion(REGION_NAME); + assertTrue(region.get(key).equals(value)); + + verifyRegionValueFailsWhenSetOperationDoesNotStartJTA(region); + } + + @Test + public void testValuesOnCacheProxyWithPutWhenSetOperationDoesNotStartJTA() throws Exception { + final Properties properties = getDistributedSystemProperties(); + final int port = server.invoke(() -> createServerRegion(REGION_NAME, properties)); + createClientRegion(host, port, REGION_NAME, true, true); + + Region region = getCache().getRegion(REGION_NAME); + assertTrue(region.get(key).equals(value)); + region.localDestroy(key); + + verifyRegionValueFailsWhenSetOperationDoesNotStartJTA(region); + } + + private void verifyRegionValueFailsWhenSetOperationDoesNotStartJTA(Region region) + throws NamingException, NotSupportedException, SystemException { + TransactionManager JTAManager = + (TransactionManager) getCache().getJNDIContext().lookup("java:/TransactionManager"); + assertNotNull(JTAManager); + + try { + JTAManager.begin(); + Collection values = region.values(); + assertFalse(values.contains(value)); + region.put(newKey, newValue); + assertThatThrownBy(() -> values.contains(newValue)).isInstanceOf(IllegalStateException.class) + .hasMessageContaining( + "The Region collection is not transactional but is being used in a transaction"); + } finally { + JTAManager.rollback(); + } + } + + @Test + public void testValuesOnProxyRegionWithPutWhenSetOperationStartsJTA() throws Exception { + final Properties properties = getDistributedSystemProperties(); + final int port = server.invoke(() -> createServerRegion(REGION_NAME, properties)); + createClientRegion(host, port, REGION_NAME, false, false); + + Region region = getCache().getRegion(REGION_NAME); + assertTrue(region.get(key).equals(value)); + Collection collection = region.values(); + // local cache data, not all operation forwarded to server (see GEODE-1887) + assertFalse(collection.contains(value)); + + verifyValuesUsingJTAWhenSetOperationStartsTransaction(region); + + verifyValuesWhenSetOperationStartsTransaction(region); + } + + @Test + public void testValuesOnCacheProxyRegionWithPutWhenSetOperationStartsJTA() throws Exception { + final Properties properties = getDistributedSystemProperties(); + final int port = server.invoke(() -> createServerRegion(REGION_NAME, properties)); + createClientRegion(host, port, REGION_NAME, false, true); + + Region region = getCache().getRegion(REGION_NAME); + assertTrue(region.get(key).equals(value)); + // local cache data + Collection values = region.values(); + assertTrue(values.contains(value)); + region.localDestroy(key); + assertFalse(values.contains(value)); + + verifyValuesUsingJTAWhenSetOperationStartsTransaction(region); + + verifyValuesWhenSetOperationStartsTransaction(region); + } + + private void verifyValuesWhenSetOperationStartsTransaction(Region region) { + TXManagerImpl txManager = getCache().getTxManager(); + try { + txManager.begin(); + Collection values = region.values(); + values.contains(value); + assertEquals(1, values.size()); + region.put(newKey, newValue); + assertTrue(values.contains(value)); + assertTrue(values.contains(newValue)); + assertEquals(2, values.size()); + } finally { + txManager.rollback(); + } + } + + private void verifyValuesUsingJTAWhenSetOperationStartsTransaction(Region region) + throws NamingException, NotSupportedException, SystemException { + TransactionManager JTAManager = + (TransactionManager) getCache().getJNDIContext().lookup("java:/TransactionManager"); + assertNotNull(JTAManager); + try { + JTAManager.begin(); + Collection values = region.values(); + // check server under JTA/Transaction + assertTrue(values.contains(value)); + assertEquals(1, values.size()); + region.put(newKey, newValue); + assertTrue(values.contains(value)); + assertTrue(values.contains(newValue)); + assertEquals(2, values.size()); + } finally { + JTAManager.rollback(); + } + } + + + @Test + public void testEntrySetOnProxyWithPutWhenSetOperationDoesNotStartJTA() throws Exception { + final Properties properties = getDistributedSystemProperties(); + final int port = server.invoke(() -> createServerRegion(REGION_NAME, properties)); + createClientRegion(host, port, REGION_NAME, true, false); + + Region region = getCache().getRegion(REGION_NAME); + assertTrue(region.get(key).equals(value)); + Map.Entry entry = region.getEntry(key); + + verifyEntrySetFailsWhenSetOperationDoesNotStartJTA(region, entry); + } + + @Test + public void testEntrySetOnCacheProxyWithPutWhenSetOperationDoesNotStartJTA() throws Exception { + final Properties properties = getDistributedSystemProperties(); + final int port = server.invoke(() -> createServerRegion(REGION_NAME, properties)); + createClientRegion(host, port, REGION_NAME, true, true); + + Region region = getCache().getRegion(REGION_NAME); + assertTrue(region.get(key).equals(value)); + Map.Entry entry = region.getEntry(key); + region.localDestroy(key); + + verifyEntrySetFailsWhenSetOperationDoesNotStartJTA(region, entry); + } + + private void verifyEntrySetFailsWhenSetOperationDoesNotStartJTA(Region region, Map.Entry entry) + throws NamingException, NotSupportedException, SystemException { + TransactionManager JTAManager = + (TransactionManager) getCache().getJNDIContext().lookup("java:/TransactionManager"); + assertNotNull(JTAManager); + + try { + JTAManager.begin(); + Collection<Map.Entry<String, String>> entrySet = region.entrySet(); + assertEquals(0, entrySet.size()); + region.put(newKey, newValue); + assertThatThrownBy(() -> entrySet.contains(entry)).isInstanceOf(IllegalStateException.class) + .hasMessageContaining( + "The Region collection is not transactional but is being used in a transaction"); + } finally { + JTAManager.rollback(); + } + } + + @Test + public void testEntrySetOnProxyRegionWithPutWhenSetOperationStartsJTA() throws Exception { + final Properties properties = getDistributedSystemProperties(); + final int port = server.invoke(() -> createServerRegion(REGION_NAME, properties)); + createClientRegion(host, port, REGION_NAME, false, false); + + Region region = getCache().getRegion(REGION_NAME); + assertTrue(region.get(key).equals(value)); + Map.Entry entry = region.getEntry(key); + + verifyEntrySetUsingJTAWhenSetOperationStartsTransaction(region); + } + + @Test + public void testEntrySetOnCacheProxyRegionWithPutWhenSetOperationStartsJTA() throws Exception { + final Properties properties = getDistributedSystemProperties(); + final int port = server.invoke(() -> createServerRegion(REGION_NAME, properties)); + createClientRegion(host, port, REGION_NAME, false, true); + + Region region = getCache().getRegion(REGION_NAME); + assertTrue(region.get(key).equals(value)); + Map.Entry entry = region.getEntry(key); + region.localDestroy(key); + + verifyEntrySetUsingJTAWhenSetOperationStartsTransaction(region); + } + + private void verifyEntrySetUsingJTAWhenSetOperationStartsTransaction(Region region) + throws NamingException, NotSupportedException, SystemException { + TransactionManager JTAManager = + (TransactionManager) getCache().getJNDIContext().lookup("java:/TransactionManager"); + assertNotNull(JTAManager); + try { + JTAManager.begin(); + Collection<Map.Entry<String, String>> entrySet = region.entrySet(); + entrySet.forEach((entry) -> { + assertTrue(entry.getValue().contains(value)); + assertTrue(entry.getKey().contains(key)); + }); + assertEquals(1, entrySet.size()); + region.put(newKey, value); + entrySet.forEach((entry) -> { + assertTrue(entry.getValue().contains(value)); + }); + assertEquals(2, entrySet.size()); + } finally { + JTAManager.rollback(); + } + } + } diff --git a/geode-core/src/test/java/org/apache/geode/internal/jta/SetOperationJTAJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/jta/SetOperationJTAJUnitTest.java index 947af8b..4a295b2 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/jta/SetOperationJTAJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/jta/SetOperationJTAJUnitTest.java @@ -15,6 +15,8 @@ package org.apache.geode.internal.jta; import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -42,6 +44,10 @@ import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionFactory; import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.execute.Function; +import org.apache.geode.cache.execute.FunctionContext; +import org.apache.geode.cache.execute.FunctionService; +import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.TXManagerImpl; @@ -56,6 +62,7 @@ public class SetOperationJTAJUnitTest { private static final String REGION_NAME = "region1"; private Map<Long, String> testData; + private Map<Long, String> modifiedData; private Cache cache; @Rule @@ -68,6 +75,9 @@ public class SetOperationJTAJUnitTest { testData.put(2L, "value2"); testData.put(3L, "duplicateValue"); testData.put(4L, "duplicateValue"); + modifiedData = new HashMap<>(); + modifiedData.putAll(testData); + modifiedData.put(5L, "newValue"); } @After @@ -85,6 +95,7 @@ public class SetOperationJTAJUnitTest { userTX.begin(); Collection<Long> set = region.keySet(); set.forEach((key) -> assertTrue(testData.keySet().contains(key))); + testData.keySet().forEach((key) -> assertTrue(set.contains(key))); } finally { validateTXManager(disableSetOpToStartJTA); if (!disableSetOpToStartJTA) { @@ -103,6 +114,7 @@ public class SetOperationJTAJUnitTest { userTX.begin(); Collection<String> set = region.values(); set.forEach((value) -> assertTrue(testData.values().contains(value))); + testData.values().forEach((value) -> assertTrue(set.contains(value))); } finally { validateTXManager(disableSetOpToStartJTA); if (!disableSetOpToStartJTA) { @@ -124,6 +136,7 @@ public class SetOperationJTAJUnitTest { assertTrue(testData.values().contains(entry.getValue())); assertTrue(testData.keySet().contains(entry.getKey())); }); + testData.entrySet().forEach((entry) -> assertTrue(set.contains(entry))); } finally { validateTXManager(disableSetOpToStartJTA); if (!disableSetOpToStartJTA) { @@ -180,4 +193,145 @@ public class SetOperationJTAJUnitTest { c.close(); } } + + @Test + public void testRegionValuesWithPutWhenSetOperationStartsJTA() throws Exception { + setupAndLoadRegion(false); + verifyRegionValuesWhenSetOperationStartsJTA(); + } + + private void verifyRegionValuesWhenSetOperationStartsJTA() throws Exception { + Context ctx = cache.getJNDIContext(); + UserTransaction userTX = startUserTransaction(ctx); + Region<Long, String> region = cache.getRegion(Region.SEPARATOR + REGION_NAME); + try { + userTX.begin(); + Collection<String> set = region.values(); + set.forEach((value) -> assertTrue(testData.values().contains(value))); + testData.values().forEach((value) -> assertTrue(set.contains(value))); + assertEquals(testData.size(), set.size()); + region.put(5L, "newValue"); + set.forEach((value) -> assertTrue(modifiedData.values().contains(value))); + modifiedData.values().forEach((value) -> assertTrue(set.contains(value))); + assertEquals(modifiedData.size(), set.size()); + } finally { + userTX.rollback(); + } + } + + @Test + public void testRegionValuesWithPutWhenSetOperationDoesNotStartJTA() throws Exception { + setupAndLoadRegion(true); + verifyRegionValuesWhenSetOperationDoesNotStartJTA(); + } + + private void verifyRegionValuesWhenSetOperationDoesNotStartJTA() throws Exception { + Context ctx = cache.getJNDIContext(); + UserTransaction userTX = startUserTransaction(ctx); + Region<Long, String> region = cache.getRegion(Region.SEPARATOR + REGION_NAME); + try { + userTX.begin(); + Collection<String> set = region.values(); + set.forEach((value) -> assertTrue(testData.values().contains(value))); + testData.values().forEach((value) -> assertTrue(set.contains(value))); + assertEquals(testData.size(), set.size()); + region.put(5L, "newValue"); + assertThatThrownBy(() -> set.contains("newValue")).isInstanceOf(IllegalStateException.class) + .hasMessageContaining( + "The Region collection is not transactional but is being used in a transaction"); + } finally { + userTX.rollback(); + } + } + + @Test + public void testTxFunctionOnMemberWhenSetOperationDoesNotStartJTA() { + doTestTxFunction(true); + } + + @Test + public void testTxFunctionOnMemberWhenSetOperationStartsJTA() { + doTestTxFunction(false); + } + + private void doTestTxFunction(boolean disableSetOpToStartJTA) { + setupAndLoadRegion(disableSetOpToStartJTA); + registerFunction(); + doTxFunction(disableSetOpToStartJTA); + } + + class TXFunctionSetOpStartsJTA implements Function { + static final String id = "TXFunctionSetOpStartsJTA"; + + public void execute(FunctionContext context) { + Region r = null; + try { + verifyRegionValuesWhenSetOperationStartsJTA(); + } catch (Exception e) { + throw new RuntimeException(e); + } + context.getResultSender().lastResult(Boolean.TRUE); + } + + public String getId() { + return id; + } + + public boolean hasResult() { + return true; + } + + public boolean optimizeForWrite() { + return true; + } + + public boolean isHA() { + return false; + } + } + + class TXFunctionSetOpDoesNoStartJTA implements Function { + static final String id = "TXFunctionSetOpDoesNotStartJTA"; + + public void execute(FunctionContext context) { + Region r = null; + try { + verifyRegionValuesWhenSetOperationDoesNotStartJTA(); + } catch (Exception e) { + throw new RuntimeException(e); + } + context.getResultSender().lastResult(Boolean.TRUE); + } + + public String getId() { + return id; + } + + public boolean hasResult() { + return true; + } + + public boolean optimizeForWrite() { + return true; + } + + public boolean isHA() { + return false; + } + } + + private void registerFunction() { + FunctionService.registerFunction(new TXFunctionSetOpDoesNoStartJTA()); + FunctionService.registerFunction(new TXFunctionSetOpStartsJTA()); + } + + private void doTxFunction(boolean disableSetOpToStartJTA) { + DistributedMember owner = cache.getDistributedSystem().getDistributedMember(); + if (disableSetOpToStartJTA) { + FunctionService.onMember(owner).execute(TXFunctionSetOpDoesNoStartJTA.id).getResult(); + } else { + FunctionService.onMember(owner).execute(TXFunctionSetOpStartsJTA.id).getResult(); + } + } + } -- To stop receiving notification emails like this one, please contact esh...@apache.org.