This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch support/1.14 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.14 by this push: new f477b35 GEODE-9404: Do not log error message if sender is not configured. (#6659) f477b35 is described below commit f477b3573bd08f515c690c3abf212546709e1346 Author: Eric Shu <e...@pivotal.io> AuthorDate: Fri Jul 9 12:09:31 2021 -0700 GEODE-9404: Do not log error message if sender is not configured. (#6659) * This is normal case for serial wan configuration. Error message should not be logged when executing transactions. * Log error message only if some events in a tx configured to group transaction but others do not have sender configured. * Should not wait for lastTransactionEvent in a tx if no sender configured or sender does not set must group transaction. (cherry picked from commit f0e328ba3eb4a1b2b47bdca5d565b79e88fecdca) --- .../geode/internal/cache/TXCommitMessage.java | 6 ++-- .../cache/TXLastEventInTransactionUtils.java | 21 +++++-------- .../org/apache/geode/internal/cache/TXState.java | 7 +++-- .../cache/TXLastEventInTransactionUtilsTest.java | 22 +++++++------- .../geode/internal/cache/wan/WANTestBase.java | 15 ++++++++++ ...lWANPropagation_PartitionedRegionDUnitTest.java | 34 ++++++++++++++++++++++ 6 files changed, 77 insertions(+), 28 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java index 3a8e121..3a19d76 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java @@ -738,7 +738,8 @@ public class TXCommitMessage extends PooledDistributionMessage } for (EntryEventImpl ee : callbacks) { - boolean isLastTransactionEvent = isConfigError || ee.equals(lastTransactionEvent); + boolean isLastTransactionEvent = TXLastEventInTransactionUtils + .isLastTransactionEvent(isConfigError, lastTransactionEvent, ee); try { if (ee.getOperation().isDestroy()) { ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY, ee, true, @@ -760,7 +761,8 @@ public class TXCommitMessage extends PooledDistributionMessage } EntryEventImpl getLastTransactionEvent(List<EntryEventImpl> callbacks) { - return TXLastEventInTransactionUtils.getLastTransactionEvent(callbacks, dm.getCache()); + return TXLastEventInTransactionUtils.getLastTransactionEventInGroupedTxForWANSender(callbacks, + dm.getCache()); } protected void processCacheRuntimeException(CacheRuntimeException problem) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtils.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtils.java index 0884ae2..377e6ca 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtils.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtils.java @@ -20,15 +20,10 @@ import java.util.ServiceConfigurationError; import java.util.Set; import java.util.stream.Collectors; -import org.apache.logging.log4j.Logger; - import org.apache.geode.cache.Cache; import org.apache.geode.cache.wan.GatewaySender; -import org.apache.geode.logging.internal.log4j.api.LogService; public class TXLastEventInTransactionUtils { - private static final Logger logger = LogService.getLogger(); - /** * @param callbacks list of events belonging to a transaction * @@ -39,9 +34,8 @@ public class TXLastEventInTransactionUtils { * events belong have different sets of senders that group transactions * then it throws a ServiceConfigurationError exception. */ - public static EntryEventImpl getLastTransactionEvent(List<EntryEventImpl> callbacks, - Cache cache) - throws ServiceConfigurationError { + public static EntryEventImpl getLastTransactionEventInGroupedTxForWANSender( + List<EntryEventImpl> callbacks, Cache cache) throws ServiceConfigurationError { if (checkNoSendersGroupTransactionEvents(callbacks, cache)) { return null; } @@ -77,11 +71,7 @@ public class TXLastEventInTransactionUtils { Cache cache) throws ServiceConfigurationError { for (String senderId : getSenderIdsForEvents(callbacks)) { GatewaySender sender = cache.getGatewaySender(senderId); - if (sender == null) { - logger.error("No sender found for {}", senderId); - throw new ServiceConfigurationError("No information for senderId: " + senderId); - } - if (sender.mustGroupTransactionEvents()) { + if (sender != null && sender.mustGroupTransactionEvents()) { return false; } } @@ -117,4 +107,9 @@ public class TXLastEventInTransactionUtils { } return sender.mustGroupTransactionEvents(); } + + static boolean isLastTransactionEvent(boolean isConfigError, + EntryEventImpl lastTransactionEvent, EntryEventImpl entryEvent) { + return isConfigError || lastTransactionEvent == null || entryEvent.equals(lastTransactionEvent); + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java index 5e687c2..bdb1eaf 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java @@ -245,15 +245,16 @@ public class TXState implements TXStateInterface { boolean isConfigError = false; EntryEventImpl lastTransactionEvent = null; try { - lastTransactionEvent = - TXLastEventInTransactionUtils.getLastTransactionEvent(getPendingCallbacks(), getCache()); + lastTransactionEvent = TXLastEventInTransactionUtils + .getLastTransactionEventInGroupedTxForWANSender(getPendingCallbacks(), getCache()); } catch (ServiceConfigurationError ex) { logger.error(ex.getMessage()); isConfigError = true; } for (EntryEventImpl ee : getPendingCallbacks()) { - boolean isLastTransactionEvent = isConfigError || ee.equals(lastTransactionEvent); + boolean isLastTransactionEvent = TXLastEventInTransactionUtils + .isLastTransactionEvent(isConfigError, lastTransactionEvent, ee); if (ee.getOperation().isDestroy()) { ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY, ee, true, isLastTransactionEvent); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtilsTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtilsTest.java index d96af89..a2632bd 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtilsTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtilsTest.java @@ -14,6 +14,7 @@ */ package org.apache.geode.internal.cache; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; @@ -125,7 +126,7 @@ public class TXLastEventInTransactionUtilsTest { events.add(event2); EntryEventImpl lastTransactionEvent = - TXLastEventInTransactionUtils.getLastTransactionEvent(events, cache); + TXLastEventInTransactionUtils.getLastTransactionEventInGroupedTxForWANSender(events, cache); assertEquals(null, lastTransactionEvent); } @@ -140,7 +141,7 @@ public class TXLastEventInTransactionUtilsTest { events.add(event2); EntryEventImpl lastTransactionEvent = - TXLastEventInTransactionUtils.getLastTransactionEvent(events, cache); + TXLastEventInTransactionUtils.getLastTransactionEventInGroupedTxForWANSender(events, cache); assertEquals(event2, lastTransactionEvent); } @@ -155,7 +156,7 @@ public class TXLastEventInTransactionUtilsTest { events.add(event2); EntryEventImpl lastTransactionEvent = - TXLastEventInTransactionUtils.getLastTransactionEvent(events, cache); + TXLastEventInTransactionUtils.getLastTransactionEventInGroupedTxForWANSender(events, cache); assertEquals(event2, lastTransactionEvent); } @@ -169,13 +170,14 @@ public class TXLastEventInTransactionUtilsTest { events.add(event1); events.add(event2); - assertThatThrownBy(() -> TXLastEventInTransactionUtils.getLastTransactionEvent(events, cache)) - .isInstanceOf(ServiceConfigurationError.class) - .hasMessageContaining("Not all events go to the same senders that group transactions"); + assertThatThrownBy(() -> TXLastEventInTransactionUtils + .getLastTransactionEventInGroupedTxForWANSender(events, cache)) + .isInstanceOf(ServiceConfigurationError.class) + .hasMessageContaining("Not all events go to the same senders that group transactions"); } @Test - public void getLastTransactionEventThrowsExceptionWhenSenderNotFound() { + public void getLastTransactionEventReturnsNullWhenSenderNotFound() { List<EntryEventImpl> events = new ArrayList(); EntryEventImpl event1 = createMockEntryEventImpl(region8); EntryEventImpl event2 = createMockEntryEventImpl(region8); @@ -183,9 +185,9 @@ public class TXLastEventInTransactionUtilsTest { events.add(event1); events.add(event2); - assertThatThrownBy(() -> TXLastEventInTransactionUtils.getLastTransactionEvent(events, cache)) - .isInstanceOf(ServiceConfigurationError.class) - .hasMessage("No information for senderId: sender5"); + assertThat( + TXLastEventInTransactionUtils.getLastTransactionEventInGroupedTxForWANSender(events, cache)) + .isNull(); } private EntryEventImpl createMockEntryEventImpl(InternalRegion region) { diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java index 0df83ca..15e2efa 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java @@ -2302,6 +2302,21 @@ public class WANTestBase extends DistributedTestCase { } } + public static void doTxPuts(String regionName, int numPuts) { + try ( + IgnoredException ignored = IgnoredException.addIgnoredException(InterruptedException.class); + IgnoredException ignored1 = + IgnoredException.addIgnoredException(GatewaySenderException.class)) { + Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName); + assertNotNull(r); + for (long i = 0; i < numPuts; i++) { + cache.getCacheTransactionManager().begin(); + r.put(i, "Value_" + i); + cache.getCacheTransactionManager().commit(); + } + } + } + public static void doPutsSameKey(String regionName, int numPuts, String key) { IgnoredException exp1 = IgnoredException.addIgnoredException(InterruptedException.class.getName()); diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java index 5604a96..c56d104 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java @@ -79,6 +79,40 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase } @Test + public void testPartitionedSerialPropagationWithTXWhenSendersNotConfiguredOnAllServers() { + int lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + int nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm6.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm7.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + vm4.invoke(() -> WANTestBase.startSender("ln")); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100, + isOffHeap())); + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100, + isOffHeap())); + + vm7.invoke(() -> WANTestBase.doTxPuts(getTestMethodName() + "_PR", 1000)); + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000)); + } + + @Test public void testBothReplicatedAndPartitionedSerialPropagation() { Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));