Repository: activemq-artemis
Updated Branches:
  refs/heads/master d435a3d00 -> fce14f1f3


ARTEMIS-328 Fixing message loss through the bridge

https://issues.apache.org/jira/browse/ARTEMIS-328


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9167213f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9167213f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9167213f

Branch: refs/heads/master
Commit: 9167213f00cc0c7f506c7986a551dbc4fecd2dea
Parents: d435a3d
Author: Clebert Suconic <[email protected]>
Authored: Mon Dec 21 22:50:10 2015 -0500
Committer: Clebert Suconic <[email protected]>
Committed: Mon Dec 21 23:00:52 2015 -0500

----------------------------------------------------------------------
 .../core/protocol/core/impl/ChannelImpl.java    |  33 +++---
 .../core/postoffice/DuplicateIDCache.java       |   9 ++
 .../postoffice/impl/DuplicateIDCacheImpl.java   |  23 ++++-
 .../core/postoffice/impl/PostOfficeImpl.java    |   5 +-
 .../core/server/cluster/impl/BridgeImpl.java    | 102 ++++++++++++++-----
 .../cluster/impl/ClusterConnectionBridge.java   |  22 ++--
 6 files changed, 139 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9167213f/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index a0985fc..9e5ccb3 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -260,7 +260,7 @@ public final class ChannelImpl implements Channel {
             }
 
             if (resendCache != null && packet.isRequiresConfirmations()) {
-               resendCache.add(packet);
+               addResendPacket(packet);
             }
          }
          finally {
@@ -332,7 +332,7 @@ public final class ChannelImpl implements Channel {
             response = null;
 
             if (resendCache != null && packet.isRequiresConfirmations()) {
-               resendCache.add(packet);
+               addResendPacket(packet);
             }
 
             connection.getTransportConnection().write(buffer, false, false);
@@ -523,6 +523,10 @@ public final class ChannelImpl implements Channel {
 
          confirmed.setChannelID(id);
 
+         if (isTrace) {
+            ActiveMQClientLogger.LOGGER.trace("ChannelImpl::flushConfirmation 
flushing confirmation " + confirmed);
+         }
+
          doWrite(confirmed);
       }
    }
@@ -598,30 +602,35 @@ public final class ChannelImpl implements Channel {
       connection.getTransportConnection().write(buffer, false, false);
    }
 
+   private void addResendPacket(Packet packet) {
+      resendCache.add(packet);
+
+      if (isTrace) {
+         ActiveMQClientLogger.LOGGER.trace("ChannelImpl::addResendPacket 
adding packet " + packet + " stored commandID=" + firstStoredCommandID + " 
possible commandIDr=" + (firstStoredCommandID + resendCache.size()));
+      }
+   }
+
    private void clearUpTo(final int lastReceivedCommandID) {
       final int numberToClear = 1 + lastReceivedCommandID - 
firstStoredCommandID;
 
-      if (numberToClear == -1) {
-         throw 
ActiveMQClientMessageBundle.BUNDLE.invalidCommandID(lastReceivedCommandID);
+      if (isTrace) {
+         ActiveMQClientLogger.LOGGER.trace("ChannelImpl::clearUpTo 
lastReceived commandID=" + lastReceivedCommandID +
+                                              " first commandID=" + 
firstStoredCommandID +
+                                              " number to clear " + 
numberToClear);
       }
 
-      int sizeToFree = 0;
-
       for (int i = 0; i < numberToClear; i++) {
          final Packet packet = resendCache.poll();
 
          if (packet == null) {
-            if (lastReceivedCommandID > 0) {
-               
ActiveMQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, 
firstStoredCommandID);
-            }
+            
ActiveMQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, 
firstStoredCommandID);
             firstStoredCommandID = lastReceivedCommandID + 1;
             return;
          }
 
-         if (packet.getType() != PacketImpl.PACKETS_CONFIRMED) {
-            sizeToFree += packet.getPacketSize();
+         if (isTrace) {
+            ActiveMQClientLogger.LOGGER.trace("ChannelImpl::clearUpTo 
confirming " + packet + " towards " + commandConfirmationHandler);
          }
-
          if (commandConfirmationHandler != null) {
             commandConfirmationHandler.commandConfirmed(packet);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9167213f/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
index 35d2f83..f316c56 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
@@ -25,8 +25,17 @@ public interface DuplicateIDCache {
 
    boolean contains(byte[] duplicateID);
 
+   void addToCache(byte[] duplicateID) throws Exception;
+
    void addToCache(byte[] duplicateID, Transaction tx) throws Exception;
 
+   /**
+    * it will add the data to the cache.
+    * If TX == null it won't use a transaction.
+    * if instantAdd=true, it won't wait a transaction to add on the cache 
which is needed on the case of the Bridges
+    */
+   void addToCache(byte[] duplicateID, Transaction tx, boolean instantAdd) 
throws Exception;
+
    void deleteFromCache(byte[] duplicateID) throws Exception;
 
    void load(List<Pair<byte[], Long>> theIds) throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9167213f/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
index 0ce6366..7059671 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
@@ -137,7 +137,17 @@ public class DuplicateIDCacheImpl implements 
DuplicateIDCache {
    }
 
    @Override
-   public synchronized void addToCache(final byte[] duplID, final Transaction 
tx) throws Exception {
+   public void addToCache(final byte[] duplID) throws Exception {
+      addToCache(duplID, null, false);
+   }
+
+   @Override
+   public void addToCache(final byte[] duplID, final Transaction tx) throws 
Exception {
+      addToCache(duplID, tx, false);
+   }
+
+   @Override
+   public synchronized void addToCache(final byte[] duplID, final Transaction 
tx, boolean instantAdd) throws Exception {
       long recordID = -1;
 
       if (tx == null) {
@@ -156,9 +166,14 @@ public class DuplicateIDCacheImpl implements 
DuplicateIDCache {
             tx.setContainsPersistent();
          }
 
-         // For a tx, it's important that the entry is not added to the cache 
until commit
-         // since if the client fails then resends them tx we don't want it to 
get rejected
-         tx.addOperation(new AddDuplicateIDOperation(duplID, recordID));
+         if (instantAdd) {
+            addToCacheInMemory(duplID, recordID);
+         }
+         else {
+            // For a tx, it's important that the entry is not added to the 
cache until commit
+            // since if the client fails then resends them tx we don't want it 
to get rejected
+            tx.addOperation(new AddDuplicateIDOperation(duplID, recordID));
+         }
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9167213f/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index da22bb9..4b25023 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1177,7 +1177,10 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
             }
          }
 
-         cacheBridge.addToCache(bridgeDupBytes, context.getTransaction());
+         // on the bridge case there is a case where the bridge reconnects
+         // and the send hasn't finished yet (think of CPU outages).
+         // for that reason we add the cache right away
+         cacheBridge.addToCache(bridgeDupBytes, context.getTransaction(), 
true);
 
          message.removeProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9167213f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index f638fbc..d81dd3e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -18,9 +18,10 @@ package org.apache.activemq.artemis.core.server.cluster.impl;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.ListIterator;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -98,7 +99,7 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
 
    private final SimpleString forwardingAddress;
 
-   private final java.util.Queue<MessageReference> refs = new 
ConcurrentLinkedQueue<>();
+   private final java.util.Map<Long, MessageReference> refs = new 
LinkedHashMap<>();
 
    private final Transformer transformer;
 
@@ -127,6 +128,9 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
 
    protected volatile ClientSessionInternal session;
 
+   // on cases where sub-classes need a consumer
+   protected volatile ClientSessionInternal sessionConsumer;
+
    protected String targetNodeID;
 
    protected TopologyMember targetNode;
@@ -230,8 +234,8 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
     */
    @Override
    public List<MessageReference> getDeliveringMessages() {
-      synchronized (this) {
-         return new ArrayList<>(refs);
+      synchronized (refs) {
+         return new ArrayList<>(refs.values());
       }
    }
 
@@ -271,34 +275,43 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
    }
 
    private void cancelRefs() {
-      MessageReference ref;
-
       LinkedList<MessageReference> list = new LinkedList<>();
 
-      while ((ref = refs.poll()) != null) {
-         if (isTrace) {
-            ActiveMQServerLogger.LOGGER.trace("Cancelling reference " + ref + 
" on bridge " + this);
-         }
-         list.addFirst(ref);
+      synchronized (refs) {
+         list.addAll(refs.values());
+         refs.clear();
+      }
+
+      if (isTrace) {
+         ActiveMQServerLogger.LOGGER.trace("BridgeImpl::cancelRefs cancelling 
" + list.size() + " references");
       }
 
       if (isTrace && list.isEmpty()) {
          ActiveMQServerLogger.LOGGER.trace("didn't have any references to 
cancel on bridge " + this);
+         return;
       }
 
-      Queue refqueue = null;
+      ListIterator<MessageReference> listIterator = 
list.listIterator(list.size());
+
+      Queue refqueue;
 
       long timeBase = System.currentTimeMillis();
 
-      for (MessageReference ref2 : list) {
-         refqueue = ref2.getQueue();
+      while (listIterator.hasPrevious()) {
+         MessageReference ref = listIterator.previous();
+
+         if (isTrace) {
+            ActiveMQServerLogger.LOGGER.trace("BridgeImpl::cancelRefs 
Cancelling reference " + ref + " on bridge " + this);
+         }
+
+         refqueue = ref.getQueue();
 
          try {
-            refqueue.cancel(ref2, timeBase);
+            refqueue.cancel(ref, timeBase);
          }
          catch (Exception e) {
             // There isn't much we can do besides log an error
-            ActiveMQServerLogger.LOGGER.errorCancellingRefOnBridge(e, ref2);
+            ActiveMQServerLogger.LOGGER.errorCancellingRefOnBridge(e, ref);
          }
       }
    }
@@ -317,10 +330,8 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
       }
    }
 
-   @Override
    public void disconnect() {
       executor.execute(new Runnable() {
-         @Override
          public void run() {
             if (session != null) {
                try {
@@ -331,6 +342,15 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
                }
                session = null;
             }
+            if (sessionConsumer != null) {
+               try {
+                  sessionConsumer.cleanUp(false);
+               }
+               catch (Exception dontcare) {
+                  ActiveMQServerLogger.LOGGER.debug(dontcare.getMessage(), 
dontcare);
+               }
+               sessionConsumer = null;
+            }
          }
       });
    }
@@ -379,7 +399,6 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
       }
    }
 
-   @Override
    public void pause() throws Exception {
       if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
          ActiveMQServerLogger.LOGGER.debug("Bridge " + this.name + " being 
paused");
@@ -452,17 +471,30 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
 
    @Override
    public void sendAcknowledged(final Message message) {
+      if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) {
+         ActiveMQServerLogger.LOGGER.trace("BridgeImpl::sendAcknowledged 
received confirmation for message " + message);
+      }
       if (active) {
          try {
-            final MessageReference ref = refs.poll();
+
+            final MessageReference ref;
+
+            synchronized (refs) {
+               ref = refs.remove(message.getMessageID());
+            }
 
             if (ref != null) {
                if (isTrace) {
-                  ActiveMQServerLogger.LOGGER.trace(this + " Acking " + ref + 
" on queue " + ref.getQueue());
+                  
ActiveMQServerLogger.LOGGER.trace("BridgeImpl::sendAcknowledged bridge " + this 
+ " Acking " + ref + " on queue " + ref.getQueue());
                }
                ref.getQueue().acknowledge(ref);
                pendingAcks.countDown();
             }
+            else {
+               if (isTrace) {
+                  
ActiveMQServerLogger.LOGGER.trace("BridgeImpl::sendAcknowledged bridge " + this 
+ " could not find reference for message " + message);
+               }
+            }
          }
          catch (Exception e) {
             ActiveMQServerLogger.LOGGER.bridgeFailedToAck(e);
@@ -529,7 +561,9 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
 
          ref.handled();
 
-         refs.add(ref);
+         synchronized (refs) {
+            refs.put(ref.getMessage().getMessageID(), ref);
+         }
 
          final ServerMessage message = beforeForward(ref.getMessage());
 
@@ -686,9 +720,11 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
       catch (final ActiveMQException e) {
          ActiveMQServerLogger.LOGGER.bridgeUnableToSendMessage(e, ref);
 
-         // We remove this reference as we are returning busy which means the 
reference will never leave the Queue.
-         // because of this we have to remove the reference here
-         refs.remove(ref);
+         synchronized (refs) {
+            // We remove this reference as we are returning busy which means 
the reference will never leave the Queue.
+            // because of this we have to remove the reference here
+            refs.remove(message.getMessageID());
+         }
 
          connectionFailed(e, false);
 
@@ -840,6 +876,7 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
                }
                // Session is pre-acknowledge
                session = (ClientSessionInternal) csf.createSession(user, 
password, false, true, true, true, 1);
+               sessionConsumer = (ClientSessionInternal) 
csf.createSession(user, password, false, true, true, true, 1);
             }
 
             if (forwardingAddress != null) {
@@ -1031,7 +1068,6 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
          bridge = bridge2;
       }
 
-      @Override
       public void run() {
          bridge.connect();
       }
@@ -1058,8 +1094,6 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
 
             }
 
-            internalCancelReferences();
-
             if (session != null) {
                ActiveMQServerLogger.LOGGER.debug("Cleaning up session " + 
session);
                session.removeFailureListener(BridgeImpl.this);
@@ -1071,6 +1105,18 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
                }
             }
 
+            if (sessionConsumer != null) {
+               ActiveMQServerLogger.LOGGER.debug("Cleaning up session " + 
session);
+               try {
+                  sessionConsumer.close();
+                  sessionConsumer = null;
+               }
+               catch (ActiveMQException dontcare) {
+               }
+            }
+
+            internalCancelReferences();
+
             if (csf != null) {
                csf.cleanup();
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9167213f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index 7bcc6b2..ad34d23 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -33,6 +33,7 @@ import 
org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
 import 
org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
 import org.apache.activemq.artemis.core.filter.Filter;
@@ -42,9 +43,9 @@ import 
org.apache.activemq.artemis.core.postoffice.BindingType;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerMessage;
+import 
org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
 import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
-import 
org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
 import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
 import org.apache.activemq.artemis.core.server.cluster.Transformer;
 import org.apache.activemq.artemis.utils.UUID;
@@ -240,27 +241,28 @@ public class ClusterConnectionBridge extends BridgeImpl {
                                                    
createSelectorFromAddress(flowRecord.getAddress()) +
                                                    ")");
 
-         session.createTemporaryQueue(managementNotificationAddress, 
notifQueueName, filter);
+         sessionConsumer.createTemporaryQueue(managementNotificationAddress, 
notifQueueName, filter);
 
-         notifConsumer = session.createConsumer(notifQueueName);
+         notifConsumer = sessionConsumer.createConsumer(notifQueueName);
 
          notifConsumer.setMessageHandler(flowRecord);
 
-         session.start();
+         sessionConsumer.start();
 
-         ClientMessage message = session.createMessage(false);
-         if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) {
-            ActiveMQServerLogger.LOGGER.trace("Requesting sendQueueInfoToQueue 
through " + this, new Exception("trace"));
+         ClientMessage message = sessionConsumer.createMessage(false);
+         if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
+            ActiveMQClientLogger.LOGGER.trace("Requesting sendQueueInfoToQueue 
through " + this, new Exception("trace"));
          }
          ManagementHelper.putOperationInvocation(message, 
ResourceNames.CORE_SERVER, "sendQueueInfoToQueue", notifQueueName.toString(), 
flowRecord.getAddress());
 
-         ClientProducer prod = session.createProducer(managementAddress);
+         ClientProducer prod = 
sessionConsumer.createProducer(managementAddress);
 
-         if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
-            ActiveMQServerLogger.LOGGER.debug("Cluster connetion bridge on " + 
clusterConnection + " requesting information on queues");
+         if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
+            ActiveMQClientLogger.LOGGER.debug("Cluster connection bridge on " 
+ clusterConnection + " requesting information on queues");
          }
 
          prod.send(message);
+         prod.close();
       }
    }
 

Reply via email to