This is an automated email from the ASF dual-hosted git repository.

boglesby pushed a commit to branch feature/GEODE-3730
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 1c446b3e6fde16ff115afce4aaeab9b46d828223
Author: Barry Oglesby <[email protected]>
AuthorDate: Mon Oct 2 12:50:59 2017 -0700

    GEODE-3730: Moved retry logic to receiver when 
gemfire.GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION=false
---
 .../sockets/command/GatewayReceiverCommand.java    | 352 +++++++++------------
 .../wan/GatewaySenderEventRemoteDispatcher.java    |  28 +-
 .../KeepEventsOnGatewaySenderQueueDUnitTest.java   | 109 +++++++
 3 files changed, 271 insertions(+), 218 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
index 12e494a..0b928f2 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
@@ -98,9 +98,6 @@ public class GatewayReceiverCommand extends BaseCommand {
       stats.incReadProcessBatchRequestTime(start - oldStart);
     }
     Part callbackArgExistsPart;
-    // Get early ack flag. This test should eventually be moved up above this 
switch
-    // statement so that all messages can take advantage of it.
-    boolean earlyAck = false;// msg.getEarlyAck();
 
     stats.incBatchSize(clientMessage.getPayloadLength());
 
@@ -143,33 +140,13 @@ public class GatewayReceiverCommand extends BaseCommand {
     if (logger.isDebugEnabled()) {
       logger.debug("Received process batch request {} that will be 
processed.", batchId);
     }
-    // If early ack mode, acknowledge right away
-    // Not sure if earlyAck makes sense with sliding window
-    if (earlyAck) {
-      serverConnection.incrementLatestBatchIdReplied(batchId);
-
-      // writeReply(msg, servConn);
-      // servConn.setAsTrue(RESPONDED);
-      {
-        long oldStart = start;
-        start = DistributionStats.getStatTime();
-        stats.incWriteProcessBatchResponseTime(start - oldStart);
-      }
-      stats.incEarlyAcks();
-    }
 
 
     if (logger.isDebugEnabled()) {
       logger.debug(
           "{}: Received process batch request {} containing {} events ({} 
bytes) with {} acknowledgement on {}",
           serverConnection.getName(), batchId, numberOfEvents, 
clientMessage.getPayloadLength(),
-          (earlyAck ? "early" : "normal"), serverConnection.getSocketString());
-      if (earlyAck) {
-        logger.debug(
-            "{}: Sent process batch early response for batch {} containing {} 
events ({} bytes) with {} acknowledgement on {}",
-            serverConnection.getName(), batchId, numberOfEvents, 
clientMessage.getPayloadLength(),
-            (earlyAck ? "early" : "normal"), 
serverConnection.getSocketString());
-      }
+          "normal", serverConnection.getSocketString());
     }
     // logger.warn("Received process batch request " + batchId + " containing
     // " + numberOfEvents + " events (" + msg.getPayloadLength() + " bytes) 
with
@@ -190,14 +167,12 @@ public class GatewayReceiverCommand extends BaseCommand {
     boolean removeOnException =
         clientMessage.getPart(partNumber++).getSerializedForm()[0] == 1 ? true 
: false;
 
-    // Keep track of whether a response has been written for
-    // exceptions
-    boolean wroteResponse = earlyAck;
     // event received in batch also have PDX events at the start of the 
batch,to
     // represent correct index on which the exception occurred, number of PDX
     // events need to be subtratced.
     int indexWithoutPDXEvent = -1; //
     for (int i = 0; i < numberOfEvents; i++) {
+      boolean retry = true;
       boolean isPdxEvent = false;
       indexWithoutPDXEvent++;
       // System.out.println("Processing event " + i + " in batch " + batchId + 
"
@@ -330,56 +305,59 @@ public class GatewayReceiverCommand extends BaseCommand {
               logger.warn(s);
               throw new Exception(s);
             }
-            region = (LocalRegion) crHelper.getRegion(regionName);
-            if (region == null) {
-              handleRegionNull(serverConnection, regionName, batchId);
-            } else {
-              clientEvent = new EventIDHolder(eventId);
-              if (versionTimeStamp > 0) {
-                VersionTag tag = VersionTag.create(region.getVersionMember());
-                tag.setIsGatewayTag(true);
-                tag.setVersionTimeStamp(versionTimeStamp);
-                tag.setDistributedSystemId(dsid);
-                clientEvent.setVersionTag(tag);
-              }
-              clientEvent.setPossibleDuplicate(possibleDuplicate);
-              handleMessageRetry(region, clientEvent);
+            do {
               try {
-                byte[] value = valuePart.getSerializedForm();
-                boolean isObject = valuePart.isObject();
-                // [sumedh] This should be done on client while sending
-                // since that is the WAN gateway
-                AuthorizeRequest authzRequest = 
serverConnection.getAuthzRequest();
-                if (authzRequest != null) {
-                  PutOperationContext putContext =
-                      authzRequest.putAuthorize(regionName, key, value, 
isObject, callbackArg);
-                  value = putContext.getSerializedValue();
-                  isObject = putContext.isObject();
-                }
-                // Attempt to create the entry
-                boolean result = false;
-                if (isPdxEvent) {
-                  result = addPdxType(crHelper, key, value);
+                region = (LocalRegion) crHelper.getRegion(regionName);
+                if (region == null) {
+                  handleRegionNull(serverConnection, regionName, batchId);
                 } else {
-                  result = region.basicBridgeCreate(key, value, isObject, 
callbackArg,
-                      serverConnection.getProxyID(), false, clientEvent, 
false);
-                  // If the create fails (presumably because it already 
exists),
-                  // attempt to update the entry
-                  if (!result) {
-                    result = region.basicBridgePut(key, value, null, isObject, 
callbackArg,
-                        serverConnection.getProxyID(), false, clientEvent);
+                  clientEvent = new EventIDHolder(eventId);
+                  if (versionTimeStamp > 0) {
+                    VersionTag tag = 
VersionTag.create(region.getVersionMember());
+                    tag.setIsGatewayTag(true);
+                    tag.setVersionTimeStamp(versionTimeStamp);
+                    tag.setDistributedSystemId(dsid);
+                    clientEvent.setVersionTag(tag);
+                  }
+                  clientEvent.setPossibleDuplicate(possibleDuplicate);
+                  handleMessageRetry(region, clientEvent);
+                  byte[] value = valuePart.getSerializedForm();
+                  boolean isObject = valuePart.isObject();
+                  // [sumedh] This should be done on client while sending
+                  // since that is the WAN gateway
+                  AuthorizeRequest authzRequest = 
serverConnection.getAuthzRequest();
+                  if (authzRequest != null) {
+                    PutOperationContext putContext =
+                        authzRequest.putAuthorize(regionName, key, value, 
isObject, callbackArg);
+                    value = putContext.getSerializedValue();
+                    isObject = putContext.isObject();
+                  }
+                  // Attempt to create the entry
+                  boolean result = false;
+                  if (isPdxEvent) {
+                    result = addPdxType(crHelper, key, value);
+                  } else {
+                    result = region.basicBridgeCreate(key, value, isObject, 
callbackArg,
+                        serverConnection.getProxyID(), false, clientEvent, 
false);
+                    // If the create fails (presumably because it already 
exists),
+                    // attempt to update the entry
+                    if (!result) {
+                      result = region.basicBridgePut(key, value, null, 
isObject, callbackArg,
+                          serverConnection.getProxyID(), false, clientEvent);
+                    }
                   }
-                }
 
-                if (result || clientEvent.isConcurrencyConflict()) {
-                  serverConnection.setModificationInfo(true, regionName, key);
-                  stats.incCreateRequest();
-                } else {
-                  // This exception will be logged in the catch block below
-                  throw new Exception(
-                      
LocalizedStrings.ProcessBatch_0_FAILED_TO_CREATE_OR_UPDATE_ENTRY_FOR_REGION_1_KEY_2_VALUE_3_CALLBACKARG_4
-                          .toLocalizedString(new Object[] 
{serverConnection.getName(), regionName,
-                              key, valuePart, callbackArg}));
+                  if (result || clientEvent.isConcurrencyConflict()) {
+                    serverConnection.setModificationInfo(true, regionName, 
key);
+                    stats.incCreateRequest();
+                    retry = false;
+                  } else {
+                    // This exception will be logged in the catch block below
+                    throw new Exception(
+                        
LocalizedStrings.ProcessBatch_0_FAILED_TO_CREATE_OR_UPDATE_ENTRY_FOR_REGION_1_KEY_2_VALUE_3_CALLBACKARG_4
+                            .toLocalizedString(new Object[] 
{serverConnection.getName(), regionName,
+                                key, valuePart, callbackArg}));
+                  }
                 }
               } catch (Exception e) {
                 logger.warn(LocalizedMessage.create(
@@ -387,9 +365,9 @@ public class GatewayReceiverCommand extends BaseCommand {
                     new Object[] {serverConnection.getName(), 
Integer.valueOf(batchId),
                         Integer.valueOf(numberOfEvents)}),
                     e);
-                throw e;
+                handleException(removeOnException, e);
               }
-            }
+            } while (retry);
             break;
           case 1: // Update
             /*
@@ -450,66 +428,52 @@ public class GatewayReceiverCommand extends BaseCommand {
               logger.warn(s);
               throw new Exception(s);
             }
-            region = (LocalRegion) crHelper.getRegion(regionName);
-            if (region == null) {
-              handleRegionNull(serverConnection, regionName, batchId);
-            } else {
-              clientEvent = new EventIDHolder(eventId);
-              if (versionTimeStamp > 0) {
-                VersionTag tag = VersionTag.create(region.getVersionMember());
-                tag.setIsGatewayTag(true);
-                tag.setVersionTimeStamp(versionTimeStamp);
-                tag.setDistributedSystemId(dsid);
-                clientEvent.setVersionTag(tag);
-              }
-              clientEvent.setPossibleDuplicate(possibleDuplicate);
-              handleMessageRetry(region, clientEvent);
+            do {
               try {
-                byte[] value = valuePart.getSerializedForm();
-                boolean isObject = valuePart.isObject();
-                AuthorizeRequest authzRequest = 
serverConnection.getAuthzRequest();
-                if (authzRequest != null) {
-                  PutOperationContext putContext = 
authzRequest.putAuthorize(regionName, key, value,
-                      isObject, callbackArg, PutOperationContext.UPDATE);
-                  value = putContext.getSerializedValue();
-                  isObject = putContext.isObject();
-                }
-                boolean result = false;
-                if (isPdxEvent) {
-                  result = addPdxType(crHelper, key, value);
+                region = (LocalRegion) crHelper.getRegion(regionName);
+                if (region == null) {
+                  handleRegionNull(serverConnection, regionName, batchId);
                 } else {
-                  result = region.basicBridgePut(key, value, null, isObject, 
callbackArg,
-                      serverConnection.getProxyID(), false, clientEvent);
-                }
-                if (result || clientEvent.isConcurrencyConflict()) {
-                  serverConnection.setModificationInfo(true, regionName, key);
-                  stats.incUpdateRequest();
-                } else {
-                  final Object[] msgArgs = new Object[] 
{serverConnection.getName(), regionName,
-                      key, valuePart, callbackArg};
-                  final StringId message =
-                      
LocalizedStrings.ProcessBatch_0_FAILED_TO_UPDATE_ENTRY_FOR_REGION_1_KEY_2_VALUE_3_AND_CALLBACKARG_4;
-                  String s = message.toLocalizedString(msgArgs);
-                  logger.info(s);
-                  throw new Exception(s);
-                }
-              } catch (CancelException e) {
-                // FIXME better exception hierarchy would avoid this check
-                if 
(serverConnection.getCachedRegionHelper().getCache().getCancelCriterion()
-                    .isCancelInProgress()) {
-                  if (logger.isDebugEnabled()) {
-                    logger.debug(
-                        "{} ignoring message of type {} from client {} because 
shutdown occurred during message processing.",
-                        serverConnection.getName(),
-                        MessageType.getString(clientMessage.getMessageType()),
-                        serverConnection.getProxyID());
+                  clientEvent = new EventIDHolder(eventId);
+                  if (versionTimeStamp > 0) {
+                    VersionTag tag = 
VersionTag.create(region.getVersionMember());
+                    tag.setIsGatewayTag(true);
+                    tag.setVersionTimeStamp(versionTimeStamp);
+                    tag.setDistributedSystemId(dsid);
+                    clientEvent.setVersionTag(tag);
+                  }
+                  clientEvent.setPossibleDuplicate(possibleDuplicate);
+                  handleMessageRetry(region, clientEvent);
+                  byte[] value = valuePart.getSerializedForm();
+                  boolean isObject = valuePart.isObject();
+                  AuthorizeRequest authzRequest = 
serverConnection.getAuthzRequest();
+                  if (authzRequest != null) {
+                    PutOperationContext putContext = 
authzRequest.putAuthorize(regionName, key,
+                        value, isObject, callbackArg, 
PutOperationContext.UPDATE);
+                    value = putContext.getSerializedValue();
+                    isObject = putContext.isObject();
+                  }
+                  boolean result = false;
+                  if (isPdxEvent) {
+                    result = addPdxType(crHelper, key, value);
+                  } else {
+                    result = region.basicBridgePut(key, value, null, isObject, 
callbackArg,
+                        serverConnection.getProxyID(), false, clientEvent);
+                  }
+                  if (result || clientEvent.isConcurrencyConflict()) {
+                    serverConnection.setModificationInfo(true, regionName, 
key);
+                    stats.incUpdateRequest();
+                    retry = false;
+                  } else {
+                    final Object[] msgArgs = new Object[] 
{serverConnection.getName(), regionName,
+                        key, valuePart, callbackArg};
+                    final StringId message =
+                        
LocalizedStrings.ProcessBatch_0_FAILED_TO_UPDATE_ENTRY_FOR_REGION_1_KEY_2_VALUE_3_AND_CALLBACKARG_4;
+                    String s = message.toLocalizedString(msgArgs);
+                    logger.info(s);
+                    throw new Exception(s);
                   }
-                  serverConnection.setFlagProcessMessagesAsFalse();
-                  serverConnection.setClientDisconnectedException(e);
-                } else {
-                  throw e;
                 }
-                return;
               } catch (Exception e) {
                 // Preserve the connection under all circumstances
                 logger.warn(LocalizedMessage.create(
@@ -517,9 +481,9 @@ public class GatewayReceiverCommand extends BaseCommand {
                     new Object[] {serverConnection.getName(), 
Integer.valueOf(batchId),
                         Integer.valueOf(numberOfEvents)}),
                     e);
-                throw e;
+                handleException(removeOnException, e);
               }
-            }
+            } while (retry);
             break;
           case 2: // Destroy
             // Retrieve the callbackArg from the message parts if necessary
@@ -566,38 +530,48 @@ public class GatewayReceiverCommand extends BaseCommand {
               logger.warn(s);
               throw new Exception(s);
             }
-            region = (LocalRegion) crHelper.getRegion(regionName);
-            if (region == null) {
-              handleRegionNull(serverConnection, regionName, batchId);
-            } else {
-              clientEvent = new EventIDHolder(eventId);
-              if (versionTimeStamp > 0) {
-                VersionTag tag = VersionTag.create(region.getVersionMember());
-                tag.setIsGatewayTag(true);
-                tag.setVersionTimeStamp(versionTimeStamp);
-                tag.setDistributedSystemId(dsid);
-                clientEvent.setVersionTag(tag);
-              }
-              handleMessageRetry(region, clientEvent);
-              // Destroy the entry
+            do {
               try {
-                AuthorizeRequest authzRequest = 
serverConnection.getAuthzRequest();
-                if (authzRequest != null) {
-                  DestroyOperationContext destroyContext =
-                      authzRequest.destroyAuthorize(regionName, key, 
callbackArg);
-                  callbackArg = destroyContext.getCallbackArg();
+                region = (LocalRegion) crHelper.getRegion(regionName);
+                if (region == null) {
+                  handleRegionNull(serverConnection, regionName, batchId);
+                } else {
+                  clientEvent = new EventIDHolder(eventId);
+                  if (versionTimeStamp > 0) {
+                    VersionTag tag = 
VersionTag.create(region.getVersionMember());
+                    tag.setIsGatewayTag(true);
+                    tag.setVersionTimeStamp(versionTimeStamp);
+                    tag.setDistributedSystemId(dsid);
+                    clientEvent.setVersionTag(tag);
+                  }
+                  handleMessageRetry(region, clientEvent);
+                  // Destroy the entry
+                  AuthorizeRequest authzRequest = 
serverConnection.getAuthzRequest();
+                  if (authzRequest != null) {
+                    DestroyOperationContext destroyContext =
+                        authzRequest.destroyAuthorize(regionName, key, 
callbackArg);
+                    callbackArg = destroyContext.getCallbackArg();
+                  }
+                  region.basicBridgeDestroy(key, callbackArg, 
serverConnection.getProxyID(), false,
+                      clientEvent);
+                  serverConnection.setModificationInfo(true, regionName, key);
+                  stats.incDestroyRequest();
+                  retry = false;
                 }
-                region.basicBridgeDestroy(key, callbackArg, 
serverConnection.getProxyID(), false,
-                    clientEvent);
-                serverConnection.setModificationInfo(true, regionName, key);
-                stats.incDestroyRequest();
               } catch (EntryNotFoundException e) {
                 logger.info(LocalizedMessage.create(
                     
LocalizedStrings.ProcessBatch_0_DURING_BATCH_DESTROY_NO_ENTRY_WAS_FOUND_FOR_KEY_1,
                     new Object[] {serverConnection.getName(), key}));
                 // throw new Exception(e);
+              } catch (Exception e) {
+                logger.warn(LocalizedMessage.create(
+                    
LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_DESTROY_REQUEST_1_CONTAINING_2_EVENTS,
+                    new Object[] {serverConnection.getName(), 
Integer.valueOf(batchId),
+                        Integer.valueOf(numberOfEvents)}),
+                    e);
+                handleException(removeOnException, e);
               }
-            }
+            } while (retry);
             break;
           case 3: // Update Time-stamp for a RegionEntry
 
@@ -715,38 +689,16 @@ public class GatewayReceiverCommand extends BaseCommand {
           break;
         }
 
-        // logger.warn("Caught exception for batch " + batchId + " containing
-        // " + numberOfEvents + " events (" + msg.getPayloadLength() + " bytes)
-        // with " + (earlyAck ? "early" : "normal") + " acknowledgement on " +
-        // getSocketString());
-        // If the response has not already been written (it is not
-        // early ack mode), increment the latest batch id replied,
-        // write the batch exception to the caller and break
-        if (!wroteResponse) {
-          // Increment the batch id unless the received batch id is -1 (a
-          // failover batch)
-          DistributedSystem ds = crHelper.getCache().getDistributedSystem();
-          String exceptionMessage =
-              
LocalizedStrings.GatewayReceiver_EXCEPTION_WHILE_PROCESSING_BATCH.toLocalizedString(
-                  new Object[] {((InternalDistributedSystem) 
ds).getDistributionManager()
-                      .getDistributedSystemId(), ds.getDistributedMember()});
-          BatchException70 be =
-              new BatchException70(exceptionMessage, e, indexWithoutPDXEvent, 
batchId);
-          exceptions.add(be);
-          if (!removeOnException) {
-            break;
-          }
-
-          // servConn.setAsTrue(RESPONDED);
-          // wroteResponse = true;
-          // break;
-        } else {
-          // If it is early ack mode, attempt to process the remaining messages
-          // in the batch.
-          // This could be problematic depending on where the exception
-          // occurred.
-          return;
-        }
+        // Increment the batch id unless the received batch id is -1 (a
+        // failover batch)
+        DistributedSystem ds = crHelper.getCache().getDistributedSystem();
+        String exceptionMessage = 
LocalizedStrings.GatewayReceiver_EXCEPTION_WHILE_PROCESSING_BATCH
+            .toLocalizedString(new Object[] {
+                ((InternalDistributedSystem) 
ds).getDistributionManager().getDistributedSystemId(),
+                ds.getDistributedMember()});
+        BatchException70 be =
+            new BatchException70(exceptionMessage, e, indexWithoutPDXEvent, 
batchId);
+        exceptions.add(be);
       } finally {
         // Increment the partNumber
         if (actionType == 0 /* create */ || actionType == 1 /* update */) {
@@ -784,7 +736,7 @@ public class GatewayReceiverCommand extends BaseCommand {
       serverConnection.incrementLatestBatchIdReplied(batchId);
       writeBatchException(clientMessage, exceptions, serverConnection, 
batchId);
       serverConnection.setAsTrue(RESPONDED);
-    } else if (!wroteResponse) {
+    } else {
       // Increment the batch id unless the received batch id is -1 (a failover
       // batch)
       serverConnection.incrementLatestBatchIdReplied(batchId);
@@ -796,12 +748,8 @@ public class GatewayReceiverCommand extends BaseCommand {
         logger.debug(
             "{}: Sent process batch normal response for batch {} containing {} 
events ({} bytes) with {} acknowledgement on {}",
             serverConnection.getName(), batchId, numberOfEvents, 
clientMessage.getPayloadLength(),
-            (earlyAck ? "early" : "normal"), 
serverConnection.getSocketString());
+            "normal", serverConnection.getSocketString());
       }
-      // logger.warn("Sent process batch normal response for batch " +
-      // batchId + " containing " + numberOfEvents + " events (" +
-      // msg.getPayloadLength() + " bytes) with " + (earlyAck ? "early" :
-      // "normal") + " acknowledgement on " + getSocketString());
     }
   }
 
@@ -818,6 +766,20 @@ public class GatewayReceiverCommand extends BaseCommand {
     return true;
   }
 
+  private void handleException(boolean removeOnException, Exception e) throws 
Exception {
+    if (shouldThrowException(removeOnException, e)) {
+      throw e;
+    } else {
+      Thread.sleep(500);
+    }
+  }
+
+  private boolean shouldThrowException(boolean removeOnException, Exception e) 
{
+    // Split out in case specific exceptions would short-circuit retry logic.
+    // Currently it just considers the boolean.
+    return removeOnException;
+  }
+
   private void handleMessageRetry(LocalRegion region, EntryEventImpl 
clientEvent) {
     if (clientEvent.isPossibleDuplicate()) {
       if (region.getAttributes().getConcurrencyChecksEnabled()) {
diff --git 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index 6c99168..5bdfd6a 100644
--- 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -622,36 +622,18 @@ public class GatewaySenderEventRemoteDispatcher 
implements GatewaySenderEventDis
               // to resend all the pdx events as well in the next batch.
               final GatewaySenderStats statistics = sender.getStatistics();
               statistics.incBatchesRedistributed();
-              // log batch exceptions and remove all the events if remove from
-              // exception is true
-              // do not remove if it is false
               if (sender.isRemoveFromQueueOnException()) {
                 // log the batchExceptions
                 logBatchExceptions(ack.getBatchException());
                 processor.handleSuccessBatchAck(batchId);
               } else {
-                // we assume that batch exception will not occur for PDX 
related
-                // events
-                List<GatewaySenderEventImpl> pdxEvents =
-                    
processor.getBatchIdToPDXEventsMap().get(ack.getBatchException().getBatchId());
-                if (pdxEvents != null) {
-                  for (GatewaySenderEventImpl senderEvent : pdxEvents) {
-                    senderEvent.isAcked = true;
-                  }
-                }
-                // log the batchExceptions
+                // log the batchExceptions. These are exceptions that were not 
retried on the remote
+                // site (e.g. NotAuthorizedException)
+                // @TODO Shoud anything else be done here to warn that events 
are lost even though
+                // the boolean is false
                 logBatchExceptions(ack.getBatchException());
-                // remove the events that have been processed.
-                BatchException70 be = ack.getBatchException();
-                List<BatchException70> exceptions = be.getExceptions();
-
-                for (int i = 0; i < exceptions.get(0).getIndex(); i++) {
-                  processor.eventQueueRemove(1);
-                }
-                // reset the sender
-                processor.handleException();
+                processor.handleSuccessBatchAck(batchId);
               }
-
             } // unsuccessful batch
             else { // The batch was successful.
               if (logger.isDebugEnabled()) {
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/KeepEventsOnGatewaySenderQueueDUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/KeepEventsOnGatewaySenderQueueDUnitTest.java
new file mode 100644
index 0000000..6c89f0f
--- /dev/null
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/KeepEventsOnGatewaySenderQueueDUnitTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.misc;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.WANTestBase;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+public class KeepEventsOnGatewaySenderQueueDUnitTest extends WANTestBase {
+
+  public KeepEventsOnGatewaySenderQueueDUnitTest() {
+    super();
+  }
+
+  @Test
+  public void testBasicKeepEventsOnGatewaySenderQueue() throws Exception {
+    // Start locators
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm2.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
+
+    String regionName = getTestMethodName() + "_PR";
+    String senderId = "ny";
+
+    // Configure receiving site members
+    createCacheInVMs(nyPort, vm3, vm4);
+    createReceiverInVMs(vm3, vm4);
+    vm3.invoke(() -> createPartitionedRegionWithPersistence(regionName, null, 
0, 100));
+    vm4.invoke(() -> createPartitionedRegionWithPersistence(regionName, null, 
0, 100));
+    vm4.invoke(() -> assignBuckets(regionName));
+
+    // Configure sending site members
+    createCacheInVMs(lnPort, vm1);
+    vm1.invoke(() -> createSender(senderId, 2, true, 100, 10, false, true, 
null, false));
+    vm1.invoke(() -> disableRemoveFromQueueOnException(senderId));
+    vm1.invoke(() -> createPartitionedRegionWithPersistence(regionName, 
senderId, 0, 100));
+
+    // Asynchronously do puts in sending site member
+    AsyncInvocation<Integer> putInvocation = vm1.invokeAsync(() -> 
doPuts(regionName, 60000l));
+
+    // Repeatedly bounce a receiving site member which will cause 
PartitionOfflineExceptions
+    AsyncInvocation<Integer> closeOpenInvocation =
+        vm3.invokeAsync(() -> closeRecreateCache(nyPort, regionName, 3));
+
+    // Once puts are complete, wait for queue to be empty
+    int numPuts = putInvocation.get(120, TimeUnit.SECONDS);
+    vm1.invoke(() -> validateQueueSizeStat(senderId, 0));
+
+    // Once the receiving site member bounce has completed, verify region 
sizes in both sites
+    closeOpenInvocation.join(120000);
+    vm1.invoke(() -> validateRegionSize(regionName, numPuts));
+    vm3.invoke(() -> validateRegionSize(regionName, numPuts));
+    vm4.invoke(() -> validateRegionSize(regionName, numPuts));
+  }
+
+  private void disableRemoveFromQueueOnException(String senderId) {
+    AbstractGatewaySender ags = (AbstractGatewaySender) 
cache.getGatewaySender(senderId);
+    ags.setRemoveFromQueueOnException(false);
+  }
+
+  private void assignBuckets(String regionName) {
+    Region region = cache.getRegion(regionName);
+    PartitionRegionHelper.assignBucketsToPartitions(region);
+  }
+
+  private int doPuts(String regionName, long timeToRun) throws Exception {
+    int numPuts = 0;
+    Region region = cache.getRegion(regionName);
+    long end = System.currentTimeMillis() + timeToRun;
+    while (System.currentTimeMillis() < end) {
+      region.put(UUID.randomUUID(), 0);
+      numPuts++;
+      Thread.sleep(10);
+    }
+    return numPuts;
+  }
+
+  private void closeRecreateCache(int locatorPort, String regionName, int 
iterations)
+      throws Exception {
+    for (int i = 0; i < iterations; i++) {
+      closeCache();
+      Thread.sleep(5000);
+      createCache(locatorPort);
+      createReceiver();
+      createPartitionedRegionWithPersistence(regionName, null, 0, 100);
+    }
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
"[email protected]" <[email protected]>.

Reply via email to