GEODE-2937: Restore removeFromQueueOnException

Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/d9343d44
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/d9343d44
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/d9343d44

Branch: refs/heads/feature/GEODE-2580
Commit: d9343d4406ac2b0625a9acf40a7ec20d6ac2cf36
Parents: e216fde
Author: Jason Huynh <huyn...@gmail.com>
Authored: Wed May 17 18:19:15 2017 -0700
Committer: Jason Huynh <huyn...@gmail.com>
Committed: Thu May 18 10:35:29 2017 -0700

----------------------------------------------------------------------
 .../cache/wan/AbstractGatewaySender.java        | 14 +++++++++
 .../AbstractGatewaySenderEventProcessor.java    |  6 ++--
 .../GatewaySenderEventCallbackDispatcher.java   |  4 +--
 .../cache/wan/GatewaySenderEventDispatcher.java |  2 +-
 .../cache/wan/AsyncEventQueueTestBase.java      | 26 +++++++++++++++
 .../client/internal/GatewaySenderBatchOp.java   | 16 ++++++----
 .../cache/client/internal/SenderProxy.java      |  6 ++--
 .../wan/GatewaySenderEventRemoteDispatcher.java | 33 +++++++++++++++++---
 8 files changed, 90 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/d9343d44/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 0ba40b4..7ed9b51 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -640,6 +640,20 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
     return this.myDSId;
   }
 
+  /**
+   * @param removeFromQueueOnException the removeFromQueueOnException to set
+   */
+  public void setRemoveFromQueueOnException(boolean 
removeFromQueueOnException) {
+    this.removeFromQueueOnException = removeFromQueueOnException;
+  }
+
+  /**
+   * @return the removeFromQueueOnException
+   */
+  public boolean isRemoveFromQueueOnException() {
+    return removeFromQueueOnException;
+  }
+
   public CancelCriterion getStopper() {
     return this.stopper;
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/d9343d44/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 702438f..0c93755 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -606,7 +606,8 @@ public abstract class AbstractGatewaySenderEventProcessor 
extends Thread {
                 conflatedEventsToBeDispatched);
           }
 
-          boolean success = 
this.dispatcher.dispatchBatch(conflatedEventsToBeDispatched, false);
+          boolean success = 
this.dispatcher.dispatchBatch(conflatedEventsToBeDispatched,
+              sender.isRemoveFromQueueOnException(), false);
           if (success) {
             if (isDebugEnabled) {
               logger.debug(
@@ -650,7 +651,8 @@ public abstract class AbstractGatewaySenderEventProcessor 
extends Thread {
             } else {
               handleUnSuccessfulBatchDispatch(events);
               if (!resetLastPeekedEvents) {
-                while 
(!this.dispatcher.dispatchBatch(conflatedEventsToBeDispatched, true)) {
+                while 
(!this.dispatcher.dispatchBatch(conflatedEventsToBeDispatched,
+                    sender.isRemoveFromQueueOnException(), true)) {
                   if (isDebugEnabled) {
                     logger.debug(
                         "During normal processing, unsuccessfully dispatched 
{} events (batch #{})",

http://git-wip-us.apache.org/repos/asf/geode/blob/d9343d44/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
index eb3c735..efdd0ce 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
@@ -65,10 +65,10 @@ public class GatewaySenderEventCallbackDispatcher 
implements GatewaySenderEventD
    * Dispatches a batch of messages to all registered 
<code>AsyncEventListener</code>s.
    * 
    * @param events The <code>List</code> of events to send
-   * 
+   * @param removeFromQueueOnException Unused.
    * @return whether the batch of messages was successfully processed
    */
-  public boolean dispatchBatch(List events, boolean isRetry) {
+  public boolean dispatchBatch(List events, boolean 
removeFromQueueOnException, boolean isRetry) {
     GatewaySenderStats statistics = this.eventProcessor.sender.getStatistics();
     boolean success = false;
     try {

http://git-wip-us.apache.org/repos/asf/geode/blob/d9343d44/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java
index 807e386..5bb5333 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java
@@ -22,7 +22,7 @@ import java.util.List;
  */
 public interface GatewaySenderEventDispatcher {
 
-  public boolean dispatchBatch(List events, boolean isRetry);
+  public boolean dispatchBatch(List events, boolean 
removeFromQueueOnException, boolean isRetry);
 
   public boolean isRemoteDispatcher();
 

http://git-wip-us.apache.org/repos/asf/geode/blob/d9343d44/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
index 1595e99..5d4fd98 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -809,6 +809,32 @@ public class AsyncEventQueueTestBase extends 
JUnit4DistributedTestCase {
         + statistics.getUnprocessedTokensAddedByPrimary()));
   }
 
+  public static void setRemoveFromQueueOnException(String senderId, boolean 
removeFromQueue) {
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = s;
+        break;
+      }
+    }
+    assertNotNull(sender);
+    ((AbstractGatewaySender) 
sender).setRemoveFromQueueOnException(removeFromQueue);
+  }
+
+  public static void unsetRemoveFromQueueOnException(String senderId) {
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = s;
+        break;
+      }
+    }
+    assertNotNull(sender);
+    ((AbstractGatewaySender) sender).setRemoveFromQueueOnException(false);
+  }
+
   public static void waitForSenderToBecomePrimary(String senderId) {
     Set<GatewaySender> senders = ((GemFireCacheImpl) 
cache).getAllGatewaySenders();
     final GatewaySender sender = getGatewaySenderById(senders, senderId);

http://git-wip-us.apache.org/repos/asf/geode/blob/d9343d44/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java
 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java
index 7fc762fe6..b8616a9 100755
--- 
a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java
+++ 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java
@@ -48,18 +48,22 @@ public class GatewaySenderBatchOp {
    * @param pool the pool to use to communicate with the server.
    * @param events list of gateway events
    * @param batchId the ID of this batch
+   * @param removeFromQueueOnException true if the events should be processed 
even after some
+   *        exception
    */
   public static void executeOn(Connection con, ExecutablePool pool, List 
events, int batchId,
-      boolean isRetry) {
+      boolean removeFromQueueOnException, boolean isRetry) {
     AbstractOp op = null;
     // System.out.println("Version: "+con.getWanSiteVersion());
     // Is this check even needed anymore? It looks like we just create the 
same exact op impl with
     // the same parameters...
     if (Version.GFE_651.compareTo(con.getWanSiteVersion()) >= 0) {
-      op = new GatewaySenderGFEBatchOpImpl(events, batchId, 
con.getDistributedSystemId(), isRetry);
+      op = new GatewaySenderGFEBatchOpImpl(events, batchId, 
removeFromQueueOnException,
+          con.getDistributedSystemId(), isRetry);
     } else {
       // Default should create a batch of server version (ACCEPTOR.VERSION)
-      op = new GatewaySenderGFEBatchOpImpl(events, batchId, 
con.getDistributedSystemId(), isRetry);
+      op = new GatewaySenderGFEBatchOpImpl(events, batchId, 
removeFromQueueOnException,
+          con.getDistributedSystemId(), isRetry);
     }
     pool.executeOn(con, op, true/* timeoutFatal */);
   }
@@ -79,9 +83,9 @@ public class GatewaySenderBatchOp {
     /**
      * @throws org.apache.geode.SerializationException if serialization fails
      */
-    public GatewaySenderGFEBatchOpImpl(List events, int batchId, int dsId, 
boolean isRetry) {
+    public GatewaySenderGFEBatchOpImpl(List events, int batchId, boolean 
removeFromQueueOnException,
+        int dsId, boolean isRetry) {
       super(MessageType.GATEWAY_RECEIVER_COMMAND, calcPartCount(events));
-      boolean removeFromQueueOnException = true;
       if (isRetry) {
         getMessage().setIsRetry();
       }
@@ -258,7 +262,7 @@ public class GatewaySenderBatchOp {
               List<BatchException70> l = (List<BatchException70>) 
part0.getObject();
 
               if (logger.isDebugEnabled()) {
-                logger.info(
+                logger.debug(
                     "We got an exception from the GatewayReceiver. MessageType 
: {} obj :{}",
                     msg.getMessageType(), obj);
               }

http://git-wip-us.apache.org/repos/asf/geode/blob/d9343d44/geode-wan/src/main/java/org/apache/geode/cache/client/internal/SenderProxy.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/SenderProxy.java
 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/SenderProxy.java
index 1ef9425..c6d283c 100644
--- 
a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/SenderProxy.java
+++ 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/SenderProxy.java
@@ -29,8 +29,10 @@ public class SenderProxy extends ServerProxy {
     super(pool);
   }
 
-  public void dispatchBatch_NewWAN(Connection con, List events, int batchId, 
boolean isRetry) {
-    GatewaySenderBatchOp.executeOn(con, this.pool, events, batchId, isRetry);
+  public void dispatchBatch_NewWAN(Connection con, List events, int batchId,
+      boolean removeFromQueueOnException, boolean isRetry) {
+    GatewaySenderBatchOp.executeOn(con, this.pool, events, batchId, 
removeFromQueueOnException,
+        isRetry);
   }
 
   public Object receiveAckFromReceiver(Connection con) {

http://git-wip-us.apache.org/repos/asf/geode/blob/d9343d44/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index 3eec101..3a41972 100644
--- 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -140,7 +140,7 @@ public class GatewaySenderEventRemoteDispatcher implements 
GatewaySenderEventDis
   }
 
   @Override
-  public boolean dispatchBatch(List events, boolean isRetry) {
+  public boolean dispatchBatch(List events, boolean 
removeFromQueueOnException, boolean isRetry) {
     GatewaySenderStats statistics = this.sender.getStatistics();
     boolean success = false;
     try {
@@ -212,7 +212,8 @@ public class GatewaySenderEventRemoteDispatcher implements 
GatewaySenderEventDis
       this.connectionLifeCycleLock.readLock().lock();
       try {
         if (connection != null) {
-          sp.dispatchBatch_NewWAN(connection, events, currentBatchId, isRetry);
+          sp.dispatchBatch_NewWAN(connection, events, currentBatchId,
+              sender.isRemoveFromQueueOnException(), isRetry);
           if (logger.isDebugEnabled()) {
             logger.debug(
                 "{} : Dispatched batch (id={}) of {} events, queue size: {} on 
connection {}",
@@ -621,8 +622,32 @@ public class GatewaySenderEventRemoteDispatcher implements 
GatewaySenderEventDis
               // log batch exceptions and remove all the events if remove from
               // exception is true
               // do not remove if it is false
-              logBatchExceptions(ack.getBatchException());
-              processor.handleSuccessBatchAck(batchId);
+              if (sender.isRemoveFromQueueOnException()) {
+                // log the batchExceptions
+                logBatchExceptions(ack.getBatchException());
+                processor.handleSuccessBatchAck(batchId);
+              } else {
+                // we assume that batch exception will not occur for PDX 
related
+                // events
+                List<GatewaySenderEventImpl> pdxEvents =
+                    
processor.getBatchIdToPDXEventsMap().get(ack.getBatchException().getBatchId());
+                if (pdxEvents != null) {
+                  for (GatewaySenderEventImpl senderEvent : pdxEvents) {
+                    senderEvent.isAcked = true;
+                  }
+                }
+                // log the batchExceptions
+                logBatchExceptions(ack.getBatchException());
+                // remove the events that have been processed.
+                BatchException70 be = ack.getBatchException();
+                List<BatchException70> exceptions = be.getExceptions();
+
+                for (int i = 0; i < exceptions.get(0).getIndex(); i++) {
+                  processor.eventQueueRemove(1);
+                }
+                // reset the sender
+                processor.handleException();
+              }
 
             } // unsuccessful batch
             else { // The batch was successful.

Reply via email to