This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new a8844ca GEODE-8486: record TransactionDataRebalancedException if tx
put failed (#5500)
a8844ca is described below
commit a8844ca0f5ee3dbf92eb83688ccb149778b98f77
Author: Eric Shu <[email protected]>
AuthorDate: Thu Sep 10 13:02:26 2020 -0700
GEODE-8486: record TransactionDataRebalancedException if tx put failed
(#5500)
This is used to handle a retry of transactional put.
---
.../org/apache/geode/internal/cache/TXState.java | 31 ++++++++--
.../apache/geode/internal/cache/TXStateTest.java | 68 ++++++++++++++++++++++
2 files changed, 94 insertions(+), 5 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index 10baf3f..ea2cd01 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -146,7 +146,9 @@ public class TXState implements TXStateInterface {
/** keeps track of events, so as not to re-apply events */
protected Set<EventID> seenEvents = new HashSet<EventID>();
/** keeps track of results of txPutEntry */
- private Map<EventID, Boolean> seenResults = new HashMap<EventID, Boolean>();
+ private Map<EventID, Boolean> seenResults = new HashMap<>();
+ /** keeps track of TransactionDataRebalancedException during txPutEntry */
+ private Map<EventID, TransactionDataRebalancedException> eventExceptions =
new HashMap<>();
@Immutable
static final TXEntryState ENTRY_EXISTS = new TXEntryState();
@@ -193,19 +195,34 @@ public class TXState implements TXStateInterface {
}
}
- private void recordEventAndResult(EntryEventImpl event, boolean result) {
+ void recordEventAndResult(EntryEventImpl event, boolean result) {
recordEvent(event);
if (event.getEventId() != null) {
this.seenResults.put(event.getEventId(), result);
}
}
- private boolean getRecordedResult(EntryEventImpl event) {
+ boolean getRecordedResult(EntryEventImpl event) {
assert event != null;
assert this.seenResults.containsKey(event.getEventId());
return this.seenResults.get(event.getEventId());
}
+ void recordEventException(EntryEventImpl event,
+ TransactionDataRebalancedException exception) {
+ if (event.getEventId() != null) {
+ eventExceptions.put(event.getEventId(), exception);
+ }
+ }
+
+ boolean getRecordedResultOrException(EntryEventImpl event) {
+ boolean result = getRecordedResult(event);
+ if (!result && eventExceptions.containsKey(event.getEventId())) {
+ throw eventExceptions.get(event.getEventId());
+ }
+ return result;
+ }
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
@@ -1374,14 +1391,14 @@ public class TXState implements TXStateInterface {
}
if (hasSeenEvent(event)) {
- return getRecordedResult(event);
+ return getRecordedResultOrException(event);
}
// if requireOldValue then oldValue gets set in event
// (even if ifNew and entry exists)
// !!!:ezoerner:20080813 need to handle ifOld for transactional on
// PRs when PRs become transactional
- TXEntryState tx = null;
+ TXEntryState tx;
boolean result = false;
try {
tx = txWriteEntry(region, event, ifNew, requireOldValue,
expectedOldValue);
@@ -1392,6 +1409,10 @@ public class TXState implements TXStateInterface {
}
} catch (EntryNotFoundException e) {
result = false;
+ } catch (TransactionDataRebalancedException e) {
+ result = false;
+ recordEventException(event, e);
+ throw e;
} finally {
recordEventAndResult(event, result);
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
index 362ee5f..9055bb0 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
@@ -32,7 +32,9 @@ import static org.mockito.Mockito.when;
import javax.transaction.Status;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.cache.EntryNotFoundException;
@@ -40,16 +42,22 @@ import
org.apache.geode.cache.FailedSynchronizationException;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.SynchronizationCommitConflictException;
import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
+import org.apache.geode.cache.TransactionDataRebalancedException;
import org.apache.geode.cache.TransactionException;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
public class TXStateTest {
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
private TXStateProxyImpl txStateProxy;
private CommitConflictException exception;
private TransactionDataNodeHasDepartedException
transactionDataNodeHasDepartedException;
private SingleThreadJTAExecutor executor;
private InternalCache cache;
private InternalDistributedSystem internalDistributedSystem;
+ private final EntryEventImpl event = mock(EntryEventImpl.class);
+ private final EventID eventID = mock(EventID.class);
@Before
public void setup() {
@@ -62,6 +70,7 @@ public class TXStateTest {
when(txStateProxy.getTxMgr()).thenReturn(mock(TXManagerImpl.class));
when(cache.getInternalDistributedSystem()).thenReturn(internalDistributedSystem);
+ when(event.getEventId()).thenReturn(eventID);
}
@Test
@@ -298,4 +307,63 @@ public class TXStateTest {
verify(regionState1).cleanup(region1);
}
+ @Test
+ public void getRecordedResultsReturnsFalseIfRecordedFalse() {
+ TXState txState = spy(new TXState(txStateProxy, true, disabledClock()));
+ txState.recordEventAndResult(event, false);
+
+ assertThat(txState.getRecordedResult(event)).isFalse();
+ }
+
+ @Test
+ public void getRecordedResultsReturnsTrueIfRecordedTrue() {
+ TXState txState = spy(new TXState(txStateProxy, true, disabledClock()));
+ txState.recordEventAndResult(event, true);
+
+ assertThat(txState.getRecordedResult(event)).isTrue();
+ }
+
+ @Test
+ public void getRecordedResultOrExceptionThrowsIfRecordedException() {
+ expectedException.expect(TransactionDataRebalancedException.class);
+ TXState txState = spy(new TXState(txStateProxy, true, disabledClock()));
+ txState.recordEventAndResult(event, false);
+ txState.recordEventException(event, new
TransactionDataRebalancedException(""));
+
+ txState.getRecordedResultOrException(event);
+ }
+
+ @Test
+ public void getRecordedResultOrExceptionReturnFalseIfRecordedFalse() {
+ TXState txState = spy(new TXState(txStateProxy, true, disabledClock()));
+ txState.recordEventAndResult(event, false);
+
+ assertThat(txState.getRecordedResultOrException(event)).isFalse();
+ }
+
+ @Test
+ public void getRecordedResultOrExceptionReturnTrueIfRecordedTrue() {
+ TXState txState = spy(new TXState(txStateProxy, true, disabledClock()));
+ txState.recordEventAndResult(event, true);
+
+ assertThat(txState.getRecordedResultOrException(event)).isTrue();
+ }
+
+ @Test
+ public void
txPutEntryRecordExceptionIfFailedWithTransactionDataRebalancedException() {
+ expectedException.expect(TransactionDataRebalancedException.class);
+ TXState txState = spy(new TXState(txStateProxy, true, disabledClock()));
+ boolean ifNew = true;
+ boolean requireOldValue = false;
+ boolean checkResources = false;
+ TransactionDataRebalancedException exception = new
TransactionDataRebalancedException("");
+ InternalRegion region = mock(InternalRegion.class);
+ when(event.getRegion()).thenReturn(region);
+ doThrow(exception).when(txState).txWriteEntry(region, event, ifNew,
requireOldValue, null);
+
+ assertThat(txState.txPutEntry(event, ifNew, requireOldValue,
checkResources, null)).isFalse();
+
+ verify(txState, never()).getRecordedResultOrException(event);
+ verify(txState).recordEventException(event, exception);
+ }
}