This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new a8844ca  GEODE-8486: record TransactionDataRebalancedException if tx 
put failed (#5500)
a8844ca is described below

commit a8844ca0f5ee3dbf92eb83688ccb149778b98f77
Author: Eric Shu <[email protected]>
AuthorDate: Thu Sep 10 13:02:26 2020 -0700

    GEODE-8486: record TransactionDataRebalancedException if tx put failed 
(#5500)
    
      This is used to handle a retry of transactional put.
---
 .../org/apache/geode/internal/cache/TXState.java   | 31 ++++++++--
 .../apache/geode/internal/cache/TXStateTest.java   | 68 ++++++++++++++++++++++
 2 files changed, 94 insertions(+), 5 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index 10baf3f..ea2cd01 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -146,7 +146,9 @@ public class TXState implements TXStateInterface {
   /** keeps track of events, so as not to re-apply events */
   protected Set<EventID> seenEvents = new HashSet<EventID>();
   /** keeps track of results of txPutEntry */
-  private Map<EventID, Boolean> seenResults = new HashMap<EventID, Boolean>();
+  private Map<EventID, Boolean> seenResults = new HashMap<>();
+  /** keeps track of TransactionDataRebalancedException during txPutEntry */
+  private Map<EventID, TransactionDataRebalancedException> eventExceptions = 
new HashMap<>();
 
   @Immutable
   static final TXEntryState ENTRY_EXISTS = new TXEntryState();
@@ -193,19 +195,34 @@ public class TXState implements TXStateInterface {
     }
   }
 
-  private void recordEventAndResult(EntryEventImpl event, boolean result) {
+  void recordEventAndResult(EntryEventImpl event, boolean result) {
     recordEvent(event);
     if (event.getEventId() != null) {
       this.seenResults.put(event.getEventId(), result);
     }
   }
 
-  private boolean getRecordedResult(EntryEventImpl event) {
+  boolean getRecordedResult(EntryEventImpl event) {
     assert event != null;
     assert this.seenResults.containsKey(event.getEventId());
     return this.seenResults.get(event.getEventId());
   }
 
+  void recordEventException(EntryEventImpl event,
+      TransactionDataRebalancedException exception) {
+    if (event.getEventId() != null) {
+      eventExceptions.put(event.getEventId(), exception);
+    }
+  }
+
+  boolean getRecordedResultOrException(EntryEventImpl event) {
+    boolean result = getRecordedResult(event);
+    if (!result && eventExceptions.containsKey(event.getEventId())) {
+      throw eventExceptions.get(event.getEventId());
+    }
+    return result;
+  }
+
   @Override
   public String toString() {
     StringBuilder builder = new StringBuilder();
@@ -1374,14 +1391,14 @@ public class TXState implements TXStateInterface {
     }
 
     if (hasSeenEvent(event)) {
-      return getRecordedResult(event);
+      return getRecordedResultOrException(event);
     }
 
     // if requireOldValue then oldValue gets set in event
     // (even if ifNew and entry exists)
     // !!!:ezoerner:20080813 need to handle ifOld for transactional on
     // PRs when PRs become transactional
-    TXEntryState tx = null;
+    TXEntryState tx;
     boolean result = false;
     try {
       tx = txWriteEntry(region, event, ifNew, requireOldValue, 
expectedOldValue);
@@ -1392,6 +1409,10 @@ public class TXState implements TXStateInterface {
       }
     } catch (EntryNotFoundException e) {
       result = false;
+    } catch (TransactionDataRebalancedException e) {
+      result = false;
+      recordEventException(event, e);
+      throw e;
     } finally {
       recordEventAndResult(event, result);
     }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java 
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
index 362ee5f..9055bb0 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
@@ -32,7 +32,9 @@ import static org.mockito.Mockito.when;
 import javax.transaction.Status;
 
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import org.apache.geode.cache.CommitConflictException;
 import org.apache.geode.cache.EntryNotFoundException;
@@ -40,16 +42,22 @@ import 
org.apache.geode.cache.FailedSynchronizationException;
 import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.SynchronizationCommitConflictException;
 import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
+import org.apache.geode.cache.TransactionDataRebalancedException;
 import org.apache.geode.cache.TransactionException;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 
 public class TXStateTest {
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   private TXStateProxyImpl txStateProxy;
   private CommitConflictException exception;
   private TransactionDataNodeHasDepartedException 
transactionDataNodeHasDepartedException;
   private SingleThreadJTAExecutor executor;
   private InternalCache cache;
   private InternalDistributedSystem internalDistributedSystem;
+  private final EntryEventImpl event = mock(EntryEventImpl.class);
+  private final EventID eventID = mock(EventID.class);
 
   @Before
   public void setup() {
@@ -62,6 +70,7 @@ public class TXStateTest {
 
     when(txStateProxy.getTxMgr()).thenReturn(mock(TXManagerImpl.class));
     
when(cache.getInternalDistributedSystem()).thenReturn(internalDistributedSystem);
+    when(event.getEventId()).thenReturn(eventID);
   }
 
   @Test
@@ -298,4 +307,63 @@ public class TXStateTest {
     verify(regionState1).cleanup(region1);
   }
 
+  @Test
+  public void getRecordedResultsReturnsFalseIfRecordedFalse() {
+    TXState txState = spy(new TXState(txStateProxy, true, disabledClock()));
+    txState.recordEventAndResult(event, false);
+
+    assertThat(txState.getRecordedResult(event)).isFalse();
+  }
+
+  @Test
+  public void getRecordedResultsReturnsTrueIfRecordedTrue() {
+    TXState txState = spy(new TXState(txStateProxy, true, disabledClock()));
+    txState.recordEventAndResult(event, true);
+
+    assertThat(txState.getRecordedResult(event)).isTrue();
+  }
+
+  @Test
+  public void getRecordedResultOrExceptionThrowsIfRecordedException() {
+    expectedException.expect(TransactionDataRebalancedException.class);
+    TXState txState = spy(new TXState(txStateProxy, true, disabledClock()));
+    txState.recordEventAndResult(event, false);
+    txState.recordEventException(event, new 
TransactionDataRebalancedException(""));
+
+    txState.getRecordedResultOrException(event);
+  }
+
+  @Test
+  public void getRecordedResultOrExceptionReturnFalseIfRecordedFalse() {
+    TXState txState = spy(new TXState(txStateProxy, true, disabledClock()));
+    txState.recordEventAndResult(event, false);
+
+    assertThat(txState.getRecordedResultOrException(event)).isFalse();
+  }
+
+  @Test
+  public void getRecordedResultOrExceptionReturnTrueIfRecordedTrue() {
+    TXState txState = spy(new TXState(txStateProxy, true, disabledClock()));
+    txState.recordEventAndResult(event, true);
+
+    assertThat(txState.getRecordedResultOrException(event)).isTrue();
+  }
+
+  @Test
+  public void 
txPutEntryRecordExceptionIfFailedWithTransactionDataRebalancedException() {
+    expectedException.expect(TransactionDataRebalancedException.class);
+    TXState txState = spy(new TXState(txStateProxy, true, disabledClock()));
+    boolean ifNew = true;
+    boolean requireOldValue = false;
+    boolean checkResources = false;
+    TransactionDataRebalancedException exception = new 
TransactionDataRebalancedException("");
+    InternalRegion region = mock(InternalRegion.class);
+    when(event.getRegion()).thenReturn(region);
+    doThrow(exception).when(txState).txWriteEntry(region, event, ifNew, 
requireOldValue, null);
+
+    assertThat(txState.txPutEntry(event, ifNew, requireOldValue, 
checkResources, null)).isFalse();
+
+    verify(txState, never()).getRecordedResultOrException(event);
+    verify(txState).recordEventException(event, exception);
+  }
 }

Reply via email to