This is an automated email from the ASF dual-hosted git repository. boglesby pushed a commit to branch feature/GEODE-3730 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 1c446b3e6fde16ff115afce4aaeab9b46d828223 Author: Barry Oglesby <[email protected]> AuthorDate: Mon Oct 2 12:50:59 2017 -0700 GEODE-3730: Moved retry logic to receiver when gemfire.GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION=false --- .../sockets/command/GatewayReceiverCommand.java | 352 +++++++++------------ .../wan/GatewaySenderEventRemoteDispatcher.java | 28 +- .../KeepEventsOnGatewaySenderQueueDUnitTest.java | 109 +++++++ 3 files changed, 271 insertions(+), 218 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java index 12e494a..0b928f2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java @@ -98,9 +98,6 @@ public class GatewayReceiverCommand extends BaseCommand { stats.incReadProcessBatchRequestTime(start - oldStart); } Part callbackArgExistsPart; - // Get early ack flag. This test should eventually be moved up above this switch - // statement so that all messages can take advantage of it. - boolean earlyAck = false;// msg.getEarlyAck(); stats.incBatchSize(clientMessage.getPayloadLength()); @@ -143,33 +140,13 @@ public class GatewayReceiverCommand extends BaseCommand { if (logger.isDebugEnabled()) { logger.debug("Received process batch request {} that will be processed.", batchId); } - // If early ack mode, acknowledge right away - // Not sure if earlyAck makes sense with sliding window - if (earlyAck) { - serverConnection.incrementLatestBatchIdReplied(batchId); - - // writeReply(msg, servConn); - // servConn.setAsTrue(RESPONDED); - { - long oldStart = start; - start = DistributionStats.getStatTime(); - stats.incWriteProcessBatchResponseTime(start - oldStart); - } - stats.incEarlyAcks(); - } if (logger.isDebugEnabled()) { logger.debug( "{}: Received process batch request {} containing {} events ({} bytes) with {} acknowledgement on {}", serverConnection.getName(), batchId, numberOfEvents, clientMessage.getPayloadLength(), - (earlyAck ? "early" : "normal"), serverConnection.getSocketString()); - if (earlyAck) { - logger.debug( - "{}: Sent process batch early response for batch {} containing {} events ({} bytes) with {} acknowledgement on {}", - serverConnection.getName(), batchId, numberOfEvents, clientMessage.getPayloadLength(), - (earlyAck ? "early" : "normal"), serverConnection.getSocketString()); - } + "normal", serverConnection.getSocketString()); } // logger.warn("Received process batch request " + batchId + " containing // " + numberOfEvents + " events (" + msg.getPayloadLength() + " bytes) with @@ -190,14 +167,12 @@ public class GatewayReceiverCommand extends BaseCommand { boolean removeOnException = clientMessage.getPart(partNumber++).getSerializedForm()[0] == 1 ? true : false; - // Keep track of whether a response has been written for - // exceptions - boolean wroteResponse = earlyAck; // event received in batch also have PDX events at the start of the batch,to // represent correct index on which the exception occurred, number of PDX // events need to be subtratced. int indexWithoutPDXEvent = -1; // for (int i = 0; i < numberOfEvents; i++) { + boolean retry = true; boolean isPdxEvent = false; indexWithoutPDXEvent++; // System.out.println("Processing event " + i + " in batch " + batchId + " @@ -330,56 +305,59 @@ public class GatewayReceiverCommand extends BaseCommand { logger.warn(s); throw new Exception(s); } - region = (LocalRegion) crHelper.getRegion(regionName); - if (region == null) { - handleRegionNull(serverConnection, regionName, batchId); - } else { - clientEvent = new EventIDHolder(eventId); - if (versionTimeStamp > 0) { - VersionTag tag = VersionTag.create(region.getVersionMember()); - tag.setIsGatewayTag(true); - tag.setVersionTimeStamp(versionTimeStamp); - tag.setDistributedSystemId(dsid); - clientEvent.setVersionTag(tag); - } - clientEvent.setPossibleDuplicate(possibleDuplicate); - handleMessageRetry(region, clientEvent); + do { try { - byte[] value = valuePart.getSerializedForm(); - boolean isObject = valuePart.isObject(); - // [sumedh] This should be done on client while sending - // since that is the WAN gateway - AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); - if (authzRequest != null) { - PutOperationContext putContext = - authzRequest.putAuthorize(regionName, key, value, isObject, callbackArg); - value = putContext.getSerializedValue(); - isObject = putContext.isObject(); - } - // Attempt to create the entry - boolean result = false; - if (isPdxEvent) { - result = addPdxType(crHelper, key, value); + region = (LocalRegion) crHelper.getRegion(regionName); + if (region == null) { + handleRegionNull(serverConnection, regionName, batchId); } else { - result = region.basicBridgeCreate(key, value, isObject, callbackArg, - serverConnection.getProxyID(), false, clientEvent, false); - // If the create fails (presumably because it already exists), - // attempt to update the entry - if (!result) { - result = region.basicBridgePut(key, value, null, isObject, callbackArg, - serverConnection.getProxyID(), false, clientEvent); + clientEvent = new EventIDHolder(eventId); + if (versionTimeStamp > 0) { + VersionTag tag = VersionTag.create(region.getVersionMember()); + tag.setIsGatewayTag(true); + tag.setVersionTimeStamp(versionTimeStamp); + tag.setDistributedSystemId(dsid); + clientEvent.setVersionTag(tag); + } + clientEvent.setPossibleDuplicate(possibleDuplicate); + handleMessageRetry(region, clientEvent); + byte[] value = valuePart.getSerializedForm(); + boolean isObject = valuePart.isObject(); + // [sumedh] This should be done on client while sending + // since that is the WAN gateway + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); + if (authzRequest != null) { + PutOperationContext putContext = + authzRequest.putAuthorize(regionName, key, value, isObject, callbackArg); + value = putContext.getSerializedValue(); + isObject = putContext.isObject(); + } + // Attempt to create the entry + boolean result = false; + if (isPdxEvent) { + result = addPdxType(crHelper, key, value); + } else { + result = region.basicBridgeCreate(key, value, isObject, callbackArg, + serverConnection.getProxyID(), false, clientEvent, false); + // If the create fails (presumably because it already exists), + // attempt to update the entry + if (!result) { + result = region.basicBridgePut(key, value, null, isObject, callbackArg, + serverConnection.getProxyID(), false, clientEvent); + } } - } - if (result || clientEvent.isConcurrencyConflict()) { - serverConnection.setModificationInfo(true, regionName, key); - stats.incCreateRequest(); - } else { - // This exception will be logged in the catch block below - throw new Exception( - LocalizedStrings.ProcessBatch_0_FAILED_TO_CREATE_OR_UPDATE_ENTRY_FOR_REGION_1_KEY_2_VALUE_3_CALLBACKARG_4 - .toLocalizedString(new Object[] {serverConnection.getName(), regionName, - key, valuePart, callbackArg})); + if (result || clientEvent.isConcurrencyConflict()) { + serverConnection.setModificationInfo(true, regionName, key); + stats.incCreateRequest(); + retry = false; + } else { + // This exception will be logged in the catch block below + throw new Exception( + LocalizedStrings.ProcessBatch_0_FAILED_TO_CREATE_OR_UPDATE_ENTRY_FOR_REGION_1_KEY_2_VALUE_3_CALLBACKARG_4 + .toLocalizedString(new Object[] {serverConnection.getName(), regionName, + key, valuePart, callbackArg})); + } } } catch (Exception e) { logger.warn(LocalizedMessage.create( @@ -387,9 +365,9 @@ public class GatewayReceiverCommand extends BaseCommand { new Object[] {serverConnection.getName(), Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}), e); - throw e; + handleException(removeOnException, e); } - } + } while (retry); break; case 1: // Update /* @@ -450,66 +428,52 @@ public class GatewayReceiverCommand extends BaseCommand { logger.warn(s); throw new Exception(s); } - region = (LocalRegion) crHelper.getRegion(regionName); - if (region == null) { - handleRegionNull(serverConnection, regionName, batchId); - } else { - clientEvent = new EventIDHolder(eventId); - if (versionTimeStamp > 0) { - VersionTag tag = VersionTag.create(region.getVersionMember()); - tag.setIsGatewayTag(true); - tag.setVersionTimeStamp(versionTimeStamp); - tag.setDistributedSystemId(dsid); - clientEvent.setVersionTag(tag); - } - clientEvent.setPossibleDuplicate(possibleDuplicate); - handleMessageRetry(region, clientEvent); + do { try { - byte[] value = valuePart.getSerializedForm(); - boolean isObject = valuePart.isObject(); - AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); - if (authzRequest != null) { - PutOperationContext putContext = authzRequest.putAuthorize(regionName, key, value, - isObject, callbackArg, PutOperationContext.UPDATE); - value = putContext.getSerializedValue(); - isObject = putContext.isObject(); - } - boolean result = false; - if (isPdxEvent) { - result = addPdxType(crHelper, key, value); + region = (LocalRegion) crHelper.getRegion(regionName); + if (region == null) { + handleRegionNull(serverConnection, regionName, batchId); } else { - result = region.basicBridgePut(key, value, null, isObject, callbackArg, - serverConnection.getProxyID(), false, clientEvent); - } - if (result || clientEvent.isConcurrencyConflict()) { - serverConnection.setModificationInfo(true, regionName, key); - stats.incUpdateRequest(); - } else { - final Object[] msgArgs = new Object[] {serverConnection.getName(), regionName, - key, valuePart, callbackArg}; - final StringId message = - LocalizedStrings.ProcessBatch_0_FAILED_TO_UPDATE_ENTRY_FOR_REGION_1_KEY_2_VALUE_3_AND_CALLBACKARG_4; - String s = message.toLocalizedString(msgArgs); - logger.info(s); - throw new Exception(s); - } - } catch (CancelException e) { - // FIXME better exception hierarchy would avoid this check - if (serverConnection.getCachedRegionHelper().getCache().getCancelCriterion() - .isCancelInProgress()) { - if (logger.isDebugEnabled()) { - logger.debug( - "{} ignoring message of type {} from client {} because shutdown occurred during message processing.", - serverConnection.getName(), - MessageType.getString(clientMessage.getMessageType()), - serverConnection.getProxyID()); + clientEvent = new EventIDHolder(eventId); + if (versionTimeStamp > 0) { + VersionTag tag = VersionTag.create(region.getVersionMember()); + tag.setIsGatewayTag(true); + tag.setVersionTimeStamp(versionTimeStamp); + tag.setDistributedSystemId(dsid); + clientEvent.setVersionTag(tag); + } + clientEvent.setPossibleDuplicate(possibleDuplicate); + handleMessageRetry(region, clientEvent); + byte[] value = valuePart.getSerializedForm(); + boolean isObject = valuePart.isObject(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); + if (authzRequest != null) { + PutOperationContext putContext = authzRequest.putAuthorize(regionName, key, + value, isObject, callbackArg, PutOperationContext.UPDATE); + value = putContext.getSerializedValue(); + isObject = putContext.isObject(); + } + boolean result = false; + if (isPdxEvent) { + result = addPdxType(crHelper, key, value); + } else { + result = region.basicBridgePut(key, value, null, isObject, callbackArg, + serverConnection.getProxyID(), false, clientEvent); + } + if (result || clientEvent.isConcurrencyConflict()) { + serverConnection.setModificationInfo(true, regionName, key); + stats.incUpdateRequest(); + retry = false; + } else { + final Object[] msgArgs = new Object[] {serverConnection.getName(), regionName, + key, valuePart, callbackArg}; + final StringId message = + LocalizedStrings.ProcessBatch_0_FAILED_TO_UPDATE_ENTRY_FOR_REGION_1_KEY_2_VALUE_3_AND_CALLBACKARG_4; + String s = message.toLocalizedString(msgArgs); + logger.info(s); + throw new Exception(s); } - serverConnection.setFlagProcessMessagesAsFalse(); - serverConnection.setClientDisconnectedException(e); - } else { - throw e; } - return; } catch (Exception e) { // Preserve the connection under all circumstances logger.warn(LocalizedMessage.create( @@ -517,9 +481,9 @@ public class GatewayReceiverCommand extends BaseCommand { new Object[] {serverConnection.getName(), Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}), e); - throw e; + handleException(removeOnException, e); } - } + } while (retry); break; case 2: // Destroy // Retrieve the callbackArg from the message parts if necessary @@ -566,38 +530,48 @@ public class GatewayReceiverCommand extends BaseCommand { logger.warn(s); throw new Exception(s); } - region = (LocalRegion) crHelper.getRegion(regionName); - if (region == null) { - handleRegionNull(serverConnection, regionName, batchId); - } else { - clientEvent = new EventIDHolder(eventId); - if (versionTimeStamp > 0) { - VersionTag tag = VersionTag.create(region.getVersionMember()); - tag.setIsGatewayTag(true); - tag.setVersionTimeStamp(versionTimeStamp); - tag.setDistributedSystemId(dsid); - clientEvent.setVersionTag(tag); - } - handleMessageRetry(region, clientEvent); - // Destroy the entry + do { try { - AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); - if (authzRequest != null) { - DestroyOperationContext destroyContext = - authzRequest.destroyAuthorize(regionName, key, callbackArg); - callbackArg = destroyContext.getCallbackArg(); + region = (LocalRegion) crHelper.getRegion(regionName); + if (region == null) { + handleRegionNull(serverConnection, regionName, batchId); + } else { + clientEvent = new EventIDHolder(eventId); + if (versionTimeStamp > 0) { + VersionTag tag = VersionTag.create(region.getVersionMember()); + tag.setIsGatewayTag(true); + tag.setVersionTimeStamp(versionTimeStamp); + tag.setDistributedSystemId(dsid); + clientEvent.setVersionTag(tag); + } + handleMessageRetry(region, clientEvent); + // Destroy the entry + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); + if (authzRequest != null) { + DestroyOperationContext destroyContext = + authzRequest.destroyAuthorize(regionName, key, callbackArg); + callbackArg = destroyContext.getCallbackArg(); + } + region.basicBridgeDestroy(key, callbackArg, serverConnection.getProxyID(), false, + clientEvent); + serverConnection.setModificationInfo(true, regionName, key); + stats.incDestroyRequest(); + retry = false; } - region.basicBridgeDestroy(key, callbackArg, serverConnection.getProxyID(), false, - clientEvent); - serverConnection.setModificationInfo(true, regionName, key); - stats.incDestroyRequest(); } catch (EntryNotFoundException e) { logger.info(LocalizedMessage.create( LocalizedStrings.ProcessBatch_0_DURING_BATCH_DESTROY_NO_ENTRY_WAS_FOUND_FOR_KEY_1, new Object[] {serverConnection.getName(), key})); // throw new Exception(e); + } catch (Exception e) { + logger.warn(LocalizedMessage.create( + LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_DESTROY_REQUEST_1_CONTAINING_2_EVENTS, + new Object[] {serverConnection.getName(), Integer.valueOf(batchId), + Integer.valueOf(numberOfEvents)}), + e); + handleException(removeOnException, e); } - } + } while (retry); break; case 3: // Update Time-stamp for a RegionEntry @@ -715,38 +689,16 @@ public class GatewayReceiverCommand extends BaseCommand { break; } - // logger.warn("Caught exception for batch " + batchId + " containing - // " + numberOfEvents + " events (" + msg.getPayloadLength() + " bytes) - // with " + (earlyAck ? "early" : "normal") + " acknowledgement on " + - // getSocketString()); - // If the response has not already been written (it is not - // early ack mode), increment the latest batch id replied, - // write the batch exception to the caller and break - if (!wroteResponse) { - // Increment the batch id unless the received batch id is -1 (a - // failover batch) - DistributedSystem ds = crHelper.getCache().getDistributedSystem(); - String exceptionMessage = - LocalizedStrings.GatewayReceiver_EXCEPTION_WHILE_PROCESSING_BATCH.toLocalizedString( - new Object[] {((InternalDistributedSystem) ds).getDistributionManager() - .getDistributedSystemId(), ds.getDistributedMember()}); - BatchException70 be = - new BatchException70(exceptionMessage, e, indexWithoutPDXEvent, batchId); - exceptions.add(be); - if (!removeOnException) { - break; - } - - // servConn.setAsTrue(RESPONDED); - // wroteResponse = true; - // break; - } else { - // If it is early ack mode, attempt to process the remaining messages - // in the batch. - // This could be problematic depending on where the exception - // occurred. - return; - } + // Increment the batch id unless the received batch id is -1 (a + // failover batch) + DistributedSystem ds = crHelper.getCache().getDistributedSystem(); + String exceptionMessage = LocalizedStrings.GatewayReceiver_EXCEPTION_WHILE_PROCESSING_BATCH + .toLocalizedString(new Object[] { + ((InternalDistributedSystem) ds).getDistributionManager().getDistributedSystemId(), + ds.getDistributedMember()}); + BatchException70 be = + new BatchException70(exceptionMessage, e, indexWithoutPDXEvent, batchId); + exceptions.add(be); } finally { // Increment the partNumber if (actionType == 0 /* create */ || actionType == 1 /* update */) { @@ -784,7 +736,7 @@ public class GatewayReceiverCommand extends BaseCommand { serverConnection.incrementLatestBatchIdReplied(batchId); writeBatchException(clientMessage, exceptions, serverConnection, batchId); serverConnection.setAsTrue(RESPONDED); - } else if (!wroteResponse) { + } else { // Increment the batch id unless the received batch id is -1 (a failover // batch) serverConnection.incrementLatestBatchIdReplied(batchId); @@ -796,12 +748,8 @@ public class GatewayReceiverCommand extends BaseCommand { logger.debug( "{}: Sent process batch normal response for batch {} containing {} events ({} bytes) with {} acknowledgement on {}", serverConnection.getName(), batchId, numberOfEvents, clientMessage.getPayloadLength(), - (earlyAck ? "early" : "normal"), serverConnection.getSocketString()); + "normal", serverConnection.getSocketString()); } - // logger.warn("Sent process batch normal response for batch " + - // batchId + " containing " + numberOfEvents + " events (" + - // msg.getPayloadLength() + " bytes) with " + (earlyAck ? "early" : - // "normal") + " acknowledgement on " + getSocketString()); } } @@ -818,6 +766,20 @@ public class GatewayReceiverCommand extends BaseCommand { return true; } + private void handleException(boolean removeOnException, Exception e) throws Exception { + if (shouldThrowException(removeOnException, e)) { + throw e; + } else { + Thread.sleep(500); + } + } + + private boolean shouldThrowException(boolean removeOnException, Exception e) { + // Split out in case specific exceptions would short-circuit retry logic. + // Currently it just considers the boolean. + return removeOnException; + } + private void handleMessageRetry(LocalRegion region, EntryEventImpl clientEvent) { if (clientEvent.isPossibleDuplicate()) { if (region.getAttributes().getConcurrencyChecksEnabled()) { diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java index 6c99168..5bdfd6a 100644 --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java @@ -622,36 +622,18 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis // to resend all the pdx events as well in the next batch. final GatewaySenderStats statistics = sender.getStatistics(); statistics.incBatchesRedistributed(); - // log batch exceptions and remove all the events if remove from - // exception is true - // do not remove if it is false if (sender.isRemoveFromQueueOnException()) { // log the batchExceptions logBatchExceptions(ack.getBatchException()); processor.handleSuccessBatchAck(batchId); } else { - // we assume that batch exception will not occur for PDX related - // events - List<GatewaySenderEventImpl> pdxEvents = - processor.getBatchIdToPDXEventsMap().get(ack.getBatchException().getBatchId()); - if (pdxEvents != null) { - for (GatewaySenderEventImpl senderEvent : pdxEvents) { - senderEvent.isAcked = true; - } - } - // log the batchExceptions + // log the batchExceptions. These are exceptions that were not retried on the remote + // site (e.g. NotAuthorizedException) + // @TODO Shoud anything else be done here to warn that events are lost even though + // the boolean is false logBatchExceptions(ack.getBatchException()); - // remove the events that have been processed. - BatchException70 be = ack.getBatchException(); - List<BatchException70> exceptions = be.getExceptions(); - - for (int i = 0; i < exceptions.get(0).getIndex(); i++) { - processor.eventQueueRemove(1); - } - // reset the sender - processor.handleException(); + processor.handleSuccessBatchAck(batchId); } - } // unsuccessful batch else { // The batch was successful. if (logger.isDebugEnabled()) { diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/KeepEventsOnGatewaySenderQueueDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/KeepEventsOnGatewaySenderQueueDUnitTest.java new file mode 100644 index 0000000..6c89f0f --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/KeepEventsOnGatewaySenderQueueDUnitTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan.misc; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.internal.cache.wan.AbstractGatewaySender; +import org.apache.geode.internal.cache.wan.WANTestBase; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.junit.categories.DistributedTest; + +@Category(DistributedTest.class) +public class KeepEventsOnGatewaySenderQueueDUnitTest extends WANTestBase { + + public KeepEventsOnGatewaySenderQueueDUnitTest() { + super(); + } + + @Test + public void testBasicKeepEventsOnGatewaySenderQueue() throws Exception { + // Start locators + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm2.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + String regionName = getTestMethodName() + "_PR"; + String senderId = "ny"; + + // Configure receiving site members + createCacheInVMs(nyPort, vm3, vm4); + createReceiverInVMs(vm3, vm4); + vm3.invoke(() -> createPartitionedRegionWithPersistence(regionName, null, 0, 100)); + vm4.invoke(() -> createPartitionedRegionWithPersistence(regionName, null, 0, 100)); + vm4.invoke(() -> assignBuckets(regionName)); + + // Configure sending site members + createCacheInVMs(lnPort, vm1); + vm1.invoke(() -> createSender(senderId, 2, true, 100, 10, false, true, null, false)); + vm1.invoke(() -> disableRemoveFromQueueOnException(senderId)); + vm1.invoke(() -> createPartitionedRegionWithPersistence(regionName, senderId, 0, 100)); + + // Asynchronously do puts in sending site member + AsyncInvocation<Integer> putInvocation = vm1.invokeAsync(() -> doPuts(regionName, 60000l)); + + // Repeatedly bounce a receiving site member which will cause PartitionOfflineExceptions + AsyncInvocation<Integer> closeOpenInvocation = + vm3.invokeAsync(() -> closeRecreateCache(nyPort, regionName, 3)); + + // Once puts are complete, wait for queue to be empty + int numPuts = putInvocation.get(120, TimeUnit.SECONDS); + vm1.invoke(() -> validateQueueSizeStat(senderId, 0)); + + // Once the receiving site member bounce has completed, verify region sizes in both sites + closeOpenInvocation.join(120000); + vm1.invoke(() -> validateRegionSize(regionName, numPuts)); + vm3.invoke(() -> validateRegionSize(regionName, numPuts)); + vm4.invoke(() -> validateRegionSize(regionName, numPuts)); + } + + private void disableRemoveFromQueueOnException(String senderId) { + AbstractGatewaySender ags = (AbstractGatewaySender) cache.getGatewaySender(senderId); + ags.setRemoveFromQueueOnException(false); + } + + private void assignBuckets(String regionName) { + Region region = cache.getRegion(regionName); + PartitionRegionHelper.assignBucketsToPartitions(region); + } + + private int doPuts(String regionName, long timeToRun) throws Exception { + int numPuts = 0; + Region region = cache.getRegion(regionName); + long end = System.currentTimeMillis() + timeToRun; + while (System.currentTimeMillis() < end) { + region.put(UUID.randomUUID(), 0); + numPuts++; + Thread.sleep(10); + } + return numPuts; + } + + private void closeRecreateCache(int locatorPort, String regionName, int iterations) + throws Exception { + for (int i = 0; i < iterations; i++) { + closeCache(); + Thread.sleep(5000); + createCache(locatorPort); + createReceiver(); + createPartitionedRegionWithPersistence(regionName, null, 0, 100); + } + } +} -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
