This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.14 by this push:
     new f477b35  GEODE-9404: Do not log error message if sender is not 
configured. (#6659)
f477b35 is described below

commit f477b3573bd08f515c690c3abf212546709e1346
Author: Eric Shu <e...@pivotal.io>
AuthorDate: Fri Jul 9 12:09:31 2021 -0700

    GEODE-9404: Do not log error message if sender is not configured. (#6659)
    
     * This is normal case for serial wan configuration. Error message should 
not be
        logged when executing transactions.
    
    * Log error message only if some events in a tx configured to group 
transaction but
    others do not have sender configured.
    
    * Should not wait for lastTransactionEvent in a tx if no sender configured 
or sender
    does not set must group transaction.
    
    (cherry picked from commit f0e328ba3eb4a1b2b47bdca5d565b79e88fecdca)
---
 .../geode/internal/cache/TXCommitMessage.java      |  6 ++--
 .../cache/TXLastEventInTransactionUtils.java       | 21 +++++--------
 .../org/apache/geode/internal/cache/TXState.java   |  7 +++--
 .../cache/TXLastEventInTransactionUtilsTest.java   | 22 +++++++-------
 .../geode/internal/cache/wan/WANTestBase.java      | 15 ++++++++++
 ...lWANPropagation_PartitionedRegionDUnitTest.java | 34 ++++++++++++++++++++++
 6 files changed, 77 insertions(+), 28 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
index 3a8e121..3a19d76 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
@@ -738,7 +738,8 @@ public class TXCommitMessage extends 
PooledDistributionMessage
     }
 
     for (EntryEventImpl ee : callbacks) {
-      boolean isLastTransactionEvent = isConfigError || 
ee.equals(lastTransactionEvent);
+      boolean isLastTransactionEvent = TXLastEventInTransactionUtils
+          .isLastTransactionEvent(isConfigError, lastTransactionEvent, ee);
       try {
         if (ee.getOperation().isDestroy()) {
           ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY, 
ee, true,
@@ -760,7 +761,8 @@ public class TXCommitMessage extends 
PooledDistributionMessage
   }
 
   EntryEventImpl getLastTransactionEvent(List<EntryEventImpl> callbacks) {
-    return TXLastEventInTransactionUtils.getLastTransactionEvent(callbacks, 
dm.getCache());
+    return 
TXLastEventInTransactionUtils.getLastTransactionEventInGroupedTxForWANSender(callbacks,
+        dm.getCache());
   }
 
   protected void processCacheRuntimeException(CacheRuntimeException problem) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtils.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtils.java
index 0884ae2..377e6ca 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtils.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtils.java
@@ -20,15 +20,10 @@ import java.util.ServiceConfigurationError;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.wan.GatewaySender;
-import org.apache.geode.logging.internal.log4j.api.LogService;
 
 public class TXLastEventInTransactionUtils {
-  private static final Logger logger = LogService.getLogger();
-
   /**
    * @param callbacks list of events belonging to a transaction
    *
@@ -39,9 +34,8 @@ public class TXLastEventInTransactionUtils {
    *         events belong have different sets of senders that group 
transactions
    *         then it throws a ServiceConfigurationError exception.
    */
-  public static EntryEventImpl getLastTransactionEvent(List<EntryEventImpl> 
callbacks,
-      Cache cache)
-      throws ServiceConfigurationError {
+  public static EntryEventImpl getLastTransactionEventInGroupedTxForWANSender(
+      List<EntryEventImpl> callbacks, Cache cache) throws 
ServiceConfigurationError {
     if (checkNoSendersGroupTransactionEvents(callbacks, cache)) {
       return null;
     }
@@ -77,11 +71,7 @@ public class TXLastEventInTransactionUtils {
       Cache cache) throws ServiceConfigurationError {
     for (String senderId : getSenderIdsForEvents(callbacks)) {
       GatewaySender sender = cache.getGatewaySender(senderId);
-      if (sender == null) {
-        logger.error("No sender found for {}", senderId);
-        throw new ServiceConfigurationError("No information for senderId: " + 
senderId);
-      }
-      if (sender.mustGroupTransactionEvents()) {
+      if (sender != null && sender.mustGroupTransactionEvents()) {
         return false;
       }
     }
@@ -117,4 +107,9 @@ public class TXLastEventInTransactionUtils {
     }
     return sender.mustGroupTransactionEvents();
   }
+
+  static boolean isLastTransactionEvent(boolean isConfigError,
+      EntryEventImpl lastTransactionEvent, EntryEventImpl entryEvent) {
+    return isConfigError || lastTransactionEvent == null || 
entryEvent.equals(lastTransactionEvent);
+  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index 5e687c2..bdb1eaf 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -245,15 +245,16 @@ public class TXState implements TXStateInterface {
     boolean isConfigError = false;
     EntryEventImpl lastTransactionEvent = null;
     try {
-      lastTransactionEvent =
-          
TXLastEventInTransactionUtils.getLastTransactionEvent(getPendingCallbacks(), 
getCache());
+      lastTransactionEvent = TXLastEventInTransactionUtils
+          
.getLastTransactionEventInGroupedTxForWANSender(getPendingCallbacks(), 
getCache());
     } catch (ServiceConfigurationError ex) {
       logger.error(ex.getMessage());
       isConfigError = true;
     }
 
     for (EntryEventImpl ee : getPendingCallbacks()) {
-      boolean isLastTransactionEvent = isConfigError || 
ee.equals(lastTransactionEvent);
+      boolean isLastTransactionEvent = TXLastEventInTransactionUtils
+          .isLastTransactionEvent(isConfigError, lastTransactionEvent, ee);
       if (ee.getOperation().isDestroy()) {
         ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY, ee, 
true,
             isLastTransactionEvent);
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtilsTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtilsTest.java
index d96af89..a2632bd 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtilsTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtilsTest.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.internal.cache;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
@@ -125,7 +126,7 @@ public class TXLastEventInTransactionUtilsTest {
     events.add(event2);
 
     EntryEventImpl lastTransactionEvent =
-        TXLastEventInTransactionUtils.getLastTransactionEvent(events, cache);
+        
TXLastEventInTransactionUtils.getLastTransactionEventInGroupedTxForWANSender(events,
 cache);
 
     assertEquals(null, lastTransactionEvent);
   }
@@ -140,7 +141,7 @@ public class TXLastEventInTransactionUtilsTest {
     events.add(event2);
 
     EntryEventImpl lastTransactionEvent =
-        TXLastEventInTransactionUtils.getLastTransactionEvent(events, cache);
+        
TXLastEventInTransactionUtils.getLastTransactionEventInGroupedTxForWANSender(events,
 cache);
 
     assertEquals(event2, lastTransactionEvent);
   }
@@ -155,7 +156,7 @@ public class TXLastEventInTransactionUtilsTest {
     events.add(event2);
 
     EntryEventImpl lastTransactionEvent =
-        TXLastEventInTransactionUtils.getLastTransactionEvent(events, cache);
+        
TXLastEventInTransactionUtils.getLastTransactionEventInGroupedTxForWANSender(events,
 cache);
 
     assertEquals(event2, lastTransactionEvent);
   }
@@ -169,13 +170,14 @@ public class TXLastEventInTransactionUtilsTest {
     events.add(event1);
     events.add(event2);
 
-    assertThatThrownBy(() -> 
TXLastEventInTransactionUtils.getLastTransactionEvent(events, cache))
-        .isInstanceOf(ServiceConfigurationError.class)
-        .hasMessageContaining("Not all events go to the same senders that 
group transactions");
+    assertThatThrownBy(() -> TXLastEventInTransactionUtils
+        .getLastTransactionEventInGroupedTxForWANSender(events, cache))
+            .isInstanceOf(ServiceConfigurationError.class)
+            .hasMessageContaining("Not all events go to the same senders that 
group transactions");
   }
 
   @Test
-  public void getLastTransactionEventThrowsExceptionWhenSenderNotFound() {
+  public void getLastTransactionEventReturnsNullWhenSenderNotFound() {
     List<EntryEventImpl> events = new ArrayList();
     EntryEventImpl event1 = createMockEntryEventImpl(region8);
     EntryEventImpl event2 = createMockEntryEventImpl(region8);
@@ -183,9 +185,9 @@ public class TXLastEventInTransactionUtilsTest {
     events.add(event1);
     events.add(event2);
 
-    assertThatThrownBy(() -> 
TXLastEventInTransactionUtils.getLastTransactionEvent(events, cache))
-        .isInstanceOf(ServiceConfigurationError.class)
-        .hasMessage("No information for senderId: sender5");
+    assertThat(
+        
TXLastEventInTransactionUtils.getLastTransactionEventInGroupedTxForWANSender(events,
 cache))
+            .isNull();
   }
 
   private EntryEventImpl createMockEntryEventImpl(InternalRegion region) {
diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 0df83ca..15e2efa 100644
--- 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -2302,6 +2302,21 @@ public class WANTestBase extends DistributedTestCase {
     }
   }
 
+  public static void doTxPuts(String regionName, int numPuts) {
+    try (
+        IgnoredException ignored = 
IgnoredException.addIgnoredException(InterruptedException.class);
+        IgnoredException ignored1 =
+            
IgnoredException.addIgnoredException(GatewaySenderException.class)) {
+      Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
+      assertNotNull(r);
+      for (long i = 0; i < numPuts; i++) {
+        cache.getCacheTransactionManager().begin();
+        r.put(i, "Value_" + i);
+        cache.getCacheTransactionManager().commit();
+      }
+    }
+  }
+
   public static void doPutsSameKey(String regionName, int numPuts, String key) 
{
     IgnoredException exp1 =
         
IgnoredException.addIgnoredException(InterruptedException.class.getName());
diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java
index 5604a96..c56d104 100644
--- 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java
@@ -79,6 +79,40 @@ public class SerialWANPropagation_PartitionedRegionDUnitTest 
extends WANTestBase
   }
 
   @Test
+  public void 
testPartitionedSerialPropagationWithTXWhenSendersNotConfiguredOnAllServers() {
+    int lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    int nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, 
false, null, true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, 
false, null, true));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + 
"_PR", "ln", 1, 100,
+        isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + 
"_PR", "ln", 1, 100,
+        isOffHeap()));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + 
"_PR", "ln", 1, 100,
+        isOffHeap()));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + 
"_PR", "ln", 1, 100,
+        isOffHeap()));
+
+    vm4.invoke(() -> WANTestBase.startSender("ln"));
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + 
"_PR", null, 1, 100,
+        isOffHeap()));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + 
"_PR", null, 1, 100,
+        isOffHeap()));
+
+    vm7.invoke(() -> WANTestBase.doTxPuts(getTestMethodName() + "_PR", 1000));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 1000));
+  }
+
+  @Test
   public void testBothReplicatedAndPartitionedSerialPropagation() {
 
     Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));

Reply via email to