This is an automated email from the ASF dual-hosted git repository.
dschneider pushed a commit to branch release/1.4.0
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/release/1.4.0 by this push:
new 8fb9fe5 Merge pull request #1239 from
dschneider-pivotal/feature/GEODE-4051
8fb9fe5 is described below
commit 8fb9fe5dca76b31d638e9865d3ae72c389ab3634
Author: Darrel Schneider <[email protected]>
AuthorDate: Mon Jan 8 11:49:29 2018 -0800
Merge pull request #1239 from dschneider-pivotal/feature/GEODE-4051
GEODE-4051: change StateMarkerMessage to always reply
(cherry picked from commit f905ea2bd178d4301787b6d123780408202533b3)
---
.../geode/internal/cache/StateFlushOperation.java | 60 ++++++++++------------
.../internal/cache/StateMarkerMessageTest.java | 32 ++++++++++++
2 files changed, 60 insertions(+), 32 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
index fc3f075..c56fa26 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
@@ -347,24 +347,37 @@ public class StateFlushOperation {
protected void process(DistributionManager dm) {
logger.trace(LogMarker.STATE_FLUSH_OP, "Processing {}", this);
if (dm.getDistributionManagerId().equals(relayRecipient)) {
- // wait for inflight operations to the aeqs even if the recipient is
the primary
- Set<DistributedRegion> regions = getRegions(dm);
- for (DistributedRegion r : regions) {
- if (r != null) {
- if (this.allRegions && r.doesNotDistribute()) {
- // no need to flush a region that does no distribution
- continue;
+ try {
+ // wait for inflight operations to the aeqs even if the recipient is
the primary
+ Set<DistributedRegion> regions = getRegions(dm);
+ for (DistributedRegion r : regions) {
+ if (r != null) {
+ if (this.allRegions && r.doesNotDistribute()) {
+ // no need to flush a region that does no distribution
+ continue;
+ }
+ waitForCurrentOperations(r, r.isInitialized());
}
- waitForCurrentOperations(r, r.isInitialized());
}
+ } catch (CancelException ignore) {
+ // cache is closed - no distribution advisor available for the
region so nothing to do but
+ // send the stabilization message
+ } catch (Exception e) {
+ logger.fatal(LocalizedMessage.create(
+
LocalizedStrings.StateFlushOperation_0__EXCEPTION_CAUGHT_WHILE_DETERMINING_CHANNEL_STATE,
+ this), e);
+ } finally {
+ // no need to send a relay request to this process - just send the
+ // ack back to the sender
+ StateStabilizedMessage ga = new StateStabilizedMessage();
+ ga.sendingMember = relayRecipient;
+ ga.setRecipient(this.getSender());
+ ga.setProcessorId(processorId);
+ if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP)) {
+ logger.trace(LogMarker.STATE_FLUSH_OP, "Sending {}", ga);
+ }
+ dm.putOutgoing(ga);
}
- // no need to send a relay request to this process - just send the
- // ack back to the sender
- StateStabilizedMessage ga = new StateStabilizedMessage();
- ga.sendingMember = relayRecipient;
- ga.setRecipient(this.getSender());
- ga.setProcessorId(processorId);
- dm.putOutgoing(ga);
} else {
// 1) wait for all messages based on the membership version (or older)
// at which the sender "joined" this region to be put on the pipe
@@ -416,23 +429,6 @@ public class StateFlushOperation {
logger.fatal(LocalizedMessage.create(
LocalizedStrings.StateFlushOperation_0__EXCEPTION_CAUGHT_WHILE_DETERMINING_CHANNEL_STATE,
this), e);
- } catch (ThreadDeath td) {
- throw td;
- } catch (VirtualMachineError err) {
- SystemFailure.initiateFailure(err);
- // If this ever returns, rethrow the error. We're poisoned
- // now, so don't let this thread continue.
- throw err;
- } catch (Throwable t) {
- // Whenever you catch Error or Throwable, you must also
- // catch VirtualMachineError (see above). However, there is
- // _still_ a possibility that you are dealing with a cascading
- // error condition, so you also need to check to see if the JVM
- // is still usable:
- SystemFailure.checkFailure();
- logger.fatal(LocalizedMessage.create(
-
LocalizedStrings.StateFlushOperation_0__THROWABLE_CAUGHT_WHILE_DETERMINING_CHANNEL_STATE,
- this), t);
} finally {
if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP)) {
logger.trace(LogMarker.STATE_FLUSH_OP, "Sending {}", gr);
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/StateMarkerMessageTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/StateMarkerMessageTest.java
index 5bec242..aa92b43 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/StateMarkerMessageTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/StateMarkerMessageTest.java
@@ -15,12 +15,20 @@
package org.apache.geode.internal.cache;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.geode.GemFireIOException;
+import org.apache.geode.distributed.internal.DistributionManager;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.StateFlushOperation.StateMarkerMessage;
import org.apache.geode.test.junit.categories.UnitTest;
@@ -33,4 +41,28 @@ public class StateMarkerMessageTest {
when(mockStateMarkerMessage.getProcessorType()).thenReturn(1);
assertThat(mockStateMarkerMessage.getProcessorType()).isEqualTo(1);
}
+
+ @Test
+ public void testProcessWithWaitForCurrentOperationsThatTimesOut() {
+ InternalDistributedMember relayRecipient =
mock(InternalDistributedMember.class);
+ DistributionManager dm = mock(DistributionManager.class);
+ InternalCache gfc = mock(InternalCache.class);
+ DistributedRegion region = mock(DistributedRegion.class);
+ CacheDistributionAdvisor distributionAdvisor =
mock(CacheDistributionAdvisor.class);
+
+ when(dm.getDistributionManagerId()).thenReturn(relayRecipient);
+ when(dm.getExistingCache()).thenReturn(gfc);
+ when(region.isInitialized()).thenReturn(true);
+ when(region.getDistributionAdvisor()).thenReturn(distributionAdvisor);
+ when(gfc.getRegionByPathForProcessing(any())).thenReturn(region);
+ doThrow(new GemFireIOException("expected in fatal log
message")).when(distributionAdvisor)
+ .waitForCurrentOperations();
+
+ StateMarkerMessage message = new StateMarkerMessage();
+ message.relayRecipient = relayRecipient;
+
+ message.process(dm);
+
+ verify(dm, times(1)).putOutgoing(any());
+ }
}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].