[
https://issues.apache.org/jira/browse/GEODE-4051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316886#comment-16316886
]
ASF GitHub Bot commented on GEODE-4051:
---------------------------------------
dschneider-pivotal closed pull request #1239: GEODE-4051: change
StateMarkerMessage to always reply
URL: https://github.com/apache/geode/pull/1239
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
index fc3f07555f..c56fa265bc 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
@@ -347,24 +347,37 @@ private DistributedRegion getRegion(DistributionManager
dm) {
protected void process(DistributionManager dm) {
logger.trace(LogMarker.STATE_FLUSH_OP, "Processing {}", this);
if (dm.getDistributionManagerId().equals(relayRecipient)) {
- // wait for inflight operations to the aeqs even if the recipient is
the primary
- Set<DistributedRegion> regions = getRegions(dm);
- for (DistributedRegion r : regions) {
- if (r != null) {
- if (this.allRegions && r.doesNotDistribute()) {
- // no need to flush a region that does no distribution
- continue;
+ try {
+ // wait for inflight operations to the aeqs even if the recipient is
the primary
+ Set<DistributedRegion> regions = getRegions(dm);
+ for (DistributedRegion r : regions) {
+ if (r != null) {
+ if (this.allRegions && r.doesNotDistribute()) {
+ // no need to flush a region that does no distribution
+ continue;
+ }
+ waitForCurrentOperations(r, r.isInitialized());
}
- waitForCurrentOperations(r, r.isInitialized());
}
+ } catch (CancelException ignore) {
+ // cache is closed - no distribution advisor available for the
region so nothing to do but
+ // send the stabilization message
+ } catch (Exception e) {
+ logger.fatal(LocalizedMessage.create(
+
LocalizedStrings.StateFlushOperation_0__EXCEPTION_CAUGHT_WHILE_DETERMINING_CHANNEL_STATE,
+ this), e);
+ } finally {
+ // no need to send a relay request to this process - just send the
+ // ack back to the sender
+ StateStabilizedMessage ga = new StateStabilizedMessage();
+ ga.sendingMember = relayRecipient;
+ ga.setRecipient(this.getSender());
+ ga.setProcessorId(processorId);
+ if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP)) {
+ logger.trace(LogMarker.STATE_FLUSH_OP, "Sending {}", ga);
+ }
+ dm.putOutgoing(ga);
}
- // no need to send a relay request to this process - just send the
- // ack back to the sender
- StateStabilizedMessage ga = new StateStabilizedMessage();
- ga.sendingMember = relayRecipient;
- ga.setRecipient(this.getSender());
- ga.setProcessorId(processorId);
- dm.putOutgoing(ga);
} else {
// 1) wait for all messages based on the membership version (or older)
// at which the sender "joined" this region to be put on the pipe
@@ -416,23 +429,6 @@ protected void process(DistributionManager dm) {
logger.fatal(LocalizedMessage.create(
LocalizedStrings.StateFlushOperation_0__EXCEPTION_CAUGHT_WHILE_DETERMINING_CHANNEL_STATE,
this), e);
- } catch (ThreadDeath td) {
- throw td;
- } catch (VirtualMachineError err) {
- SystemFailure.initiateFailure(err);
- // If this ever returns, rethrow the error. We're poisoned
- // now, so don't let this thread continue.
- throw err;
- } catch (Throwable t) {
- // Whenever you catch Error or Throwable, you must also
- // catch VirtualMachineError (see above). However, there is
- // _still_ a possibility that you are dealing with a cascading
- // error condition, so you also need to check to see if the JVM
- // is still usable:
- SystemFailure.checkFailure();
- logger.fatal(LocalizedMessage.create(
-
LocalizedStrings.StateFlushOperation_0__THROWABLE_CAUGHT_WHILE_DETERMINING_CHANNEL_STATE,
- this), t);
} finally {
if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP)) {
logger.trace(LogMarker.STATE_FLUSH_OP, "Sending {}", gr);
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/StateMarkerMessageTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/StateMarkerMessageTest.java
index 5bec242dcb..aa92b43c89 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/StateMarkerMessageTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/StateMarkerMessageTest.java
@@ -15,12 +15,20 @@
package org.apache.geode.internal.cache;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.geode.GemFireIOException;
+import org.apache.geode.distributed.internal.DistributionManager;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.StateFlushOperation.StateMarkerMessage;
import org.apache.geode.test.junit.categories.UnitTest;
@@ -33,4 +41,28 @@ public void shouldBeMockable() throws Exception {
when(mockStateMarkerMessage.getProcessorType()).thenReturn(1);
assertThat(mockStateMarkerMessage.getProcessorType()).isEqualTo(1);
}
+
+ @Test
+ public void testProcessWithWaitForCurrentOperationsThatTimesOut() {
+ InternalDistributedMember relayRecipient =
mock(InternalDistributedMember.class);
+ DistributionManager dm = mock(DistributionManager.class);
+ InternalCache gfc = mock(InternalCache.class);
+ DistributedRegion region = mock(DistributedRegion.class);
+ CacheDistributionAdvisor distributionAdvisor =
mock(CacheDistributionAdvisor.class);
+
+ when(dm.getDistributionManagerId()).thenReturn(relayRecipient);
+ when(dm.getExistingCache()).thenReturn(gfc);
+ when(region.isInitialized()).thenReturn(true);
+ when(region.getDistributionAdvisor()).thenReturn(distributionAdvisor);
+ when(gfc.getRegionByPathForProcessing(any())).thenReturn(region);
+ doThrow(new GemFireIOException("expected in fatal log
message")).when(distributionAdvisor)
+ .waitForCurrentOperations();
+
+ StateMarkerMessage message = new StateMarkerMessage();
+ message.relayRecipient = relayRecipient;
+
+ message.process(dm);
+
+ verify(dm, times(1)).putOutgoing(any());
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Two server jvms crashed at same time and caused some primary and redundant
> buckets to be cleared. Causing some buckets to get locked and not able to
> recover also after bouncing all servers
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: GEODE-4051
> URL: https://issues.apache.org/jira/browse/GEODE-4051
> Project: Geode
> Issue Type: Bug
> Components: regions
> Affects Versions: 1.2.0
> Reporter: Igor Barchak
> Assignee: Darrel Schneider
>
> "Pooled Waiting Message Processor 5" tid=0x162
> java.lang.Thread.State: TIMED_WAITING
> at sun.misc.Unsafe.park(Native Method)
> - waiting on java.util.concurrent.CountDownLatch$Sync@1993a5
> at
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
> at
> org.apache.geode.internal.util.concurrent.StoppableCountDownLatch.await(StoppableCountDownLatch.java:64)
> at
> org.apache.geode.distributed.internal.ReplyProcessor21.basicWait(ReplyProcessor21.java:715)
> at
> org.apache.geode.distributed.internal.ReplyProcessor21.waitForReplies(ReplyProcessor21.java:644)
> at
> org.apache.geode.distributed.internal.ReplyProcessor21.waitForReplies(ReplyProcessor21.java:624)
> at
> org.apache.geode.distributed.internal.ReplyProcessor21.waitForReplies(ReplyProcessor21.java:519)
> at
> org.apache.geode.internal.cache.StateFlushOperation.flush(StateFlushOperation.java:243)
> at
> org.apache.geode.internal.cache.InitialImageOperation.getFromOne(InitialImageOperation.java:349)
> at
> org.apache.geode.internal.cache.DistributedRegion.getInitialImageAndRecovery(DistributedRegion.java:1168)
> at
> org.apache.geode.internal.cache.DistributedRegion.initialize(DistributedRegion.java:1023)
> at
> org.apache.geode.internal.cache.BucketRegion.initialize(BucketRegion.java:253)
> at
> org.apache.geode.internal.cache.LocalRegion.createSubregion(LocalRegion.java:962)
> at
> org.apache.geode.internal.cache.PartitionedRegionDataStore.createBucketRegion(PartitionedRegionDataStore.java:726)
> at
> org.apache.geode.internal.cache.PartitionedRegionDataStore.grabFreeBucket(PartitionedRegionDataStore.java:414)
> - locked org.apache.geode.internal.cache.ProxyBucketRegion@6820a0b6
> at
> org.apache.geode.internal.cache.PartitionedRegionDataStore.grabFreeBucketRecursively(PartitionedRegionDataStore.java:272)
> at
> org.apache.geode.internal.cache.PartitionedRegionDataStore.grabBucket(PartitionedRegionDataStore.java:2815)
> at
> org.apache.geode.internal.cache.partitioned.ManageBackupBucketMessage.operateOnPartitionedRegion(ManageBackupBucketMessage.java:148)
> at
> org.apache.geode.internal.cache.partitioned.PartitionMessage.process(PartitionMessage.java:332)
> Seems like it was introduced in this fix
> https://github.com/apache/geode/commit/3a1062e245b3ded52ea3f6b6de0aff94ce846fa3?diff=split
> See StateMarkerMessage.process
> The first if condition doesn't have a finally block.
> The else has a finally block.
> The first if condition didn't have a 'waitFor' operation earlier - it was
> introduced in this commit
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)