This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch release/1.4.0
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/release/1.4.0 by this push:
     new 8fb9fe5  Merge pull request #1239 from 
dschneider-pivotal/feature/GEODE-4051
8fb9fe5 is described below

commit 8fb9fe5dca76b31d638e9865d3ae72c389ab3634
Author: Darrel Schneider <[email protected]>
AuthorDate: Mon Jan 8 11:49:29 2018 -0800

    Merge pull request #1239 from dschneider-pivotal/feature/GEODE-4051
    
    GEODE-4051: change StateMarkerMessage to always reply
    (cherry picked from commit f905ea2bd178d4301787b6d123780408202533b3)
---
 .../geode/internal/cache/StateFlushOperation.java  | 60 ++++++++++------------
 .../internal/cache/StateMarkerMessageTest.java     | 32 ++++++++++++
 2 files changed, 60 insertions(+), 32 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
index fc3f075..c56fa26 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
@@ -347,24 +347,37 @@ public class StateFlushOperation {
     protected void process(DistributionManager dm) {
       logger.trace(LogMarker.STATE_FLUSH_OP, "Processing {}", this);
       if (dm.getDistributionManagerId().equals(relayRecipient)) {
-        // wait for inflight operations to the aeqs even if the recipient is 
the primary
-        Set<DistributedRegion> regions = getRegions(dm);
-        for (DistributedRegion r : regions) {
-          if (r != null) {
-            if (this.allRegions && r.doesNotDistribute()) {
-              // no need to flush a region that does no distribution
-              continue;
+        try {
+          // wait for inflight operations to the aeqs even if the recipient is 
the primary
+          Set<DistributedRegion> regions = getRegions(dm);
+          for (DistributedRegion r : regions) {
+            if (r != null) {
+              if (this.allRegions && r.doesNotDistribute()) {
+                // no need to flush a region that does no distribution
+                continue;
+              }
+              waitForCurrentOperations(r, r.isInitialized());
             }
-            waitForCurrentOperations(r, r.isInitialized());
           }
+        } catch (CancelException ignore) {
+          // cache is closed - no distribution advisor available for the 
region so nothing to do but
+          // send the stabilization message
+        } catch (Exception e) {
+          logger.fatal(LocalizedMessage.create(
+              
LocalizedStrings.StateFlushOperation_0__EXCEPTION_CAUGHT_WHILE_DETERMINING_CHANNEL_STATE,
+              this), e);
+        } finally {
+          // no need to send a relay request to this process - just send the
+          // ack back to the sender
+          StateStabilizedMessage ga = new StateStabilizedMessage();
+          ga.sendingMember = relayRecipient;
+          ga.setRecipient(this.getSender());
+          ga.setProcessorId(processorId);
+          if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP)) {
+            logger.trace(LogMarker.STATE_FLUSH_OP, "Sending {}", ga);
+          }
+          dm.putOutgoing(ga);
         }
-        // no need to send a relay request to this process - just send the
-        // ack back to the sender
-        StateStabilizedMessage ga = new StateStabilizedMessage();
-        ga.sendingMember = relayRecipient;
-        ga.setRecipient(this.getSender());
-        ga.setProcessorId(processorId);
-        dm.putOutgoing(ga);
       } else {
         // 1) wait for all messages based on the membership version (or older)
         // at which the sender "joined" this region to be put on the pipe
@@ -416,23 +429,6 @@ public class StateFlushOperation {
           logger.fatal(LocalizedMessage.create(
               
LocalizedStrings.StateFlushOperation_0__EXCEPTION_CAUGHT_WHILE_DETERMINING_CHANNEL_STATE,
               this), e);
-        } catch (ThreadDeath td) {
-          throw td;
-        } catch (VirtualMachineError err) {
-          SystemFailure.initiateFailure(err);
-          // If this ever returns, rethrow the error. We're poisoned
-          // now, so don't let this thread continue.
-          throw err;
-        } catch (Throwable t) {
-          // Whenever you catch Error or Throwable, you must also
-          // catch VirtualMachineError (see above). However, there is
-          // _still_ a possibility that you are dealing with a cascading
-          // error condition, so you also need to check to see if the JVM
-          // is still usable:
-          SystemFailure.checkFailure();
-          logger.fatal(LocalizedMessage.create(
-              
LocalizedStrings.StateFlushOperation_0__THROWABLE_CAUGHT_WHILE_DETERMINING_CHANNEL_STATE,
-              this), t);
         } finally {
           if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP)) {
             logger.trace(LogMarker.STATE_FLUSH_OP, "Sending {}", gr);
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/StateMarkerMessageTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/StateMarkerMessageTest.java
index 5bec242..aa92b43 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/StateMarkerMessageTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/StateMarkerMessageTest.java
@@ -15,12 +15,20 @@
 package org.apache.geode.internal.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.geode.GemFireIOException;
+import org.apache.geode.distributed.internal.DistributionManager;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.StateFlushOperation.StateMarkerMessage;
 import org.apache.geode.test.junit.categories.UnitTest;
 
@@ -33,4 +41,28 @@ public class StateMarkerMessageTest {
     when(mockStateMarkerMessage.getProcessorType()).thenReturn(1);
     assertThat(mockStateMarkerMessage.getProcessorType()).isEqualTo(1);
   }
+
+  @Test
+  public void testProcessWithWaitForCurrentOperationsThatTimesOut() {
+    InternalDistributedMember relayRecipient = 
mock(InternalDistributedMember.class);
+    DistributionManager dm = mock(DistributionManager.class);
+    InternalCache gfc = mock(InternalCache.class);
+    DistributedRegion region = mock(DistributedRegion.class);
+    CacheDistributionAdvisor distributionAdvisor = 
mock(CacheDistributionAdvisor.class);
+
+    when(dm.getDistributionManagerId()).thenReturn(relayRecipient);
+    when(dm.getExistingCache()).thenReturn(gfc);
+    when(region.isInitialized()).thenReturn(true);
+    when(region.getDistributionAdvisor()).thenReturn(distributionAdvisor);
+    when(gfc.getRegionByPathForProcessing(any())).thenReturn(region);
+    doThrow(new GemFireIOException("expected in fatal log 
message")).when(distributionAdvisor)
+        .waitForCurrentOperations();
+
+    StateMarkerMessage message = new StateMarkerMessage();
+    message.relayRecipient = relayRecipient;
+
+    message.process(dm);
+
+    verify(dm, times(1)).putOutgoing(any());
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to