ACTIVEMQ6-94: Using proper flow control on very large messages over the bridge

This will remove some of the verifications written by Howard on his commit.
I did this to simplify the flow control

This closes #197


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/ada112a6
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/ada112a6
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/ada112a6

Branch: refs/heads/master
Commit: ada112a6a37dce8ddf48e2238904421b2ca8e0dc
Parents: c1111cc
Author: Clebert Suconic <[email protected]>
Authored: Wed Apr 22 21:44:28 2015 -0400
Committer: Clebert Suconic <[email protected]>
Committed: Thu Apr 23 15:48:20 2015 -0400

----------------------------------------------------------------------
 .../config/ActiveMQDefaultConfiguration.java    | 14 ++++
 .../activemq/core/protocol/core/Channel.java    | 11 ----
 .../core/impl/ActiveMQSessionContext.java       |  8 ---
 .../core/protocol/core/impl/ChannelImpl.java    | 68 +++++---------------
 .../core/config/BridgeConfiguration.java        | 15 +++++
 .../config/ClusterConnectionConfiguration.java  | 14 ++++
 .../deployers/impl/FileConfigurationParser.java | 12 +++-
 .../core/server/cluster/ClusterManager.java     |  5 +-
 .../cluster/impl/ClusterConnectionImpl.java     | 11 +++-
 .../resources/schema/activemq-configuration.xsd | 20 +++++-
 .../core/config/impl/FileConfigurationTest.java |  4 ++
 .../resources/ConfigurationTest-full-config.xml | 38 ++++++-----
 docs/user-manual/en/clusters.md                 |  4 ++
 docs/user-manual/en/configuration-index.md      |  2 +
 docs/user-manual/en/core-bridges.md             |  6 ++
 .../integration/cluster/bridge/BridgeTest.java  | 36 +++--------
 .../cluster/util/BackupSyncDelay.java           |  6 --
 17 files changed, 145 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/ada112a6/activemq-core-client/src/main/java/org/apache/activemq/api/config/ActiveMQDefaultConfiguration.java
----------------------------------------------------------------------
diff --git 
a/activemq-core-client/src/main/java/org/apache/activemq/api/config/ActiveMQDefaultConfiguration.java
 
b/activemq-core-client/src/main/java/org/apache/activemq/api/config/ActiveMQDefaultConfiguration.java
index 4049fb4..cea7633 100644
--- 
a/activemq-core-client/src/main/java/org/apache/activemq/api/config/ActiveMQDefaultConfiguration.java
+++ 
b/activemq-core-client/src/main/java/org/apache/activemq/api/config/ActiveMQDefaultConfiguration.java
@@ -309,6 +309,10 @@ public final class ActiveMQDefaultConfiguration
    // Once the bridge has received this many bytes, it sends a confirmation
    private static int DEFAULT_BRIDGE_CONFIRMATION_WINDOW_SIZE = 1048576;
 
+   // Producer flow control is disabled by default on the bridge
+   // You probably need to enable this if you use lots of huge messages
+   private static int DEFAULT_BRIDGE_PRODUCER_WINDOW_SIZE = -1;
+
    // Upon reconnection this configures the number of time the same node on 
the topology will be retried before reseting the server locator and using the 
initial connectors
    private static int DEFAULT_BRIDGE_CONNECT_SAME_NODE = 10;
 
@@ -870,6 +874,16 @@ public final class ActiveMQDefaultConfiguration
       return DEFAULT_BRIDGE_CONFIRMATION_WINDOW_SIZE;
    }
 
+
+   /**
+    * This default is used for both bridge and cluster connections (since they 
both translate to bridges) *
+    * @return
+    */
+   public static int getDefaultBridgeProducerWindowSize()
+   {
+      return DEFAULT_BRIDGE_PRODUCER_WINDOW_SIZE;
+   }
+
    /**
     * Upon reconnection this configures the number of time the same node on 
the topology will be retried before reseting the server locator and using the 
initial connectors
     */

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/ada112a6/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Channel.java
----------------------------------------------------------------------
diff --git 
a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Channel.java
 
b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Channel.java
index c876419..7546952 100644
--- 
a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Channel.java
+++ 
b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Channel.java
@@ -201,15 +201,4 @@ public interface Channel
     * @param transferring whether the channel is transferring
     */
    void setTransferring(boolean transferring);
-
-   /**
-    * for large message server send, each entry in resend cache will hold a 
reference to
-    * a chunk of bytes which can cause OOM if the cache quickly build up. This 
method
-    * make sure the resent cache size can't be more than one by blocking the 
call.
-    *
-    * @param timeout max waiting time for the resend cache
-    *
-    * @return true if the resend cache gets cleared
-    */
-   boolean largeServerCheck(long timeout);
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/ada112a6/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git 
a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java
 
b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java
index a3d532d..7097bd2 100644
--- 
a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java
+++ 
b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -461,14 +461,6 @@ public class ActiveMQSessionContext extends SessionContext
       else
       {
          sessionChannel.send(chunkPacket);
-         if (!sessionChannel.largeServerCheck(MAX_RESENDCACHE_WAITING_TIME))
-         {
-            ActiveMQClientLogger.LOGGER.warn("Bridge detected that the target 
server is slow to " +
-                    " send back chunk confirmations. It 's possible the bridge 
may take more memory" +
-                    " during sending of a large message. It may be a temporary 
situation if this warning" +
-                    " occasionally shows up.");
-         }
-
       }
 
       return chunkPacket.getPacketSize();

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/ada112a6/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java
----------------------------------------------------------------------
diff --git 
a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java
 
b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java
index 25f6f81..a71aed5 100644
--- 
a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java
+++ 
b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java
@@ -226,27 +226,6 @@ public final class ChannelImpl implements Channel
       this.transferring = transferring;
    }
 
-   @Override
-   public boolean largeServerCheck(long timeout)
-   {
-      if (resendCache == null) return true;
-
-      synchronized (resendCache)
-      {
-         if (resendCache.size() >= 1)
-         {
-            try
-            {
-               resendCache.wait(timeout);
-            }
-            catch (InterruptedException e)
-            {
-            }
-         }
-      }
-      return resendCache.size() == 0;
-   }
-
    // This must never called by more than one thread concurrently
    public boolean send(final Packet packet, final boolean flush, final boolean 
batch)
    {
@@ -628,12 +607,7 @@ public final class ChannelImpl implements Channel
 
          firstStoredCommandID = 0;
 
-         synchronized (resendCache)
-         {
-            resendCache.clear();
-            resendCache.notifyAll();
-         }
-
+         resendCache.clear();
       }
    }
 
@@ -698,38 +672,28 @@ public final class ChannelImpl implements Channel
 
       int sizeToFree = 0;
 
-      try
+      for (int i = 0; i < numberToClear; i++)
       {
-         for (int i = 0; i < numberToClear; i++)
-         {
-            final Packet packet = resendCache.poll();
-
-            if (packet == null)
-            {
-               if (lastReceivedCommandID > 0)
-               {
-                  
ActiveMQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, 
firstStoredCommandID);
-               }
-               firstStoredCommandID = lastReceivedCommandID + 1;
-               return;
-            }
+         final Packet packet = resendCache.poll();
 
-            if (packet.getType() != PacketImpl.PACKETS_CONFIRMED)
+         if (packet == null)
+         {
+            if (lastReceivedCommandID > 0)
             {
-               sizeToFree += packet.getPacketSize();
+               
ActiveMQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, 
firstStoredCommandID);
             }
+            firstStoredCommandID = lastReceivedCommandID + 1;
+            return;
+         }
 
-            if (commandConfirmationHandler != null)
-            {
-               commandConfirmationHandler.commandConfirmed(packet);
-            }
+         if (packet.getType() != PacketImpl.PACKETS_CONFIRMED)
+         {
+            sizeToFree += packet.getPacketSize();
          }
-      }
-      finally
-      {
-         synchronized (resendCache)
+
+         if (commandConfirmationHandler != null)
          {
-            resendCache.notifyAll();
+            commandConfirmationHandler.commandConfirmed(packet);
          }
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/ada112a6/activemq-server/src/main/java/org/apache/activemq/core/config/BridgeConfiguration.java
----------------------------------------------------------------------
diff --git 
a/activemq-server/src/main/java/org/apache/activemq/core/config/BridgeConfiguration.java
 
b/activemq-server/src/main/java/org/apache/activemq/core/config/BridgeConfiguration.java
index 87fae42..65cb2a4 100644
--- 
a/activemq-server/src/main/java/org/apache/activemq/core/config/BridgeConfiguration.java
+++ 
b/activemq-server/src/main/java/org/apache/activemq/core/config/BridgeConfiguration.java
@@ -56,6 +56,8 @@ public final class BridgeConfiguration implements Serializable
 
    private int confirmationWindowSize = 
ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
 
+   private int producerWindowSize = 
ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize();
+
    private long clientFailureCheckPeriod = 
ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
 
    private String user = ActiveMQDefaultConfiguration.getDefaultClusterUser();
@@ -302,6 +304,19 @@ public final class BridgeConfiguration implements 
Serializable
       return this;
    }
 
+   /** The producer flow control on the birdge */
+   public BridgeConfiguration setProducerWindowSize(final int 
producerWindowSize)
+   {
+      this.producerWindowSize = producerWindowSize;
+      return this;
+   }
+
+   public int getProducerWindowSize()
+   {
+      return producerWindowSize;
+
+   }
+
    public long getClientFailureCheckPeriod()
    {
       return clientFailureCheckPeriod;

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/ada112a6/activemq-server/src/main/java/org/apache/activemq/core/config/ClusterConnectionConfiguration.java
----------------------------------------------------------------------
diff --git 
a/activemq-server/src/main/java/org/apache/activemq/core/config/ClusterConnectionConfiguration.java
 
b/activemq-server/src/main/java/org/apache/activemq/core/config/ClusterConnectionConfiguration.java
index 3a6c315..ea95dd8 100644
--- 
a/activemq-server/src/main/java/org/apache/activemq/core/config/ClusterConnectionConfiguration.java
+++ 
b/activemq-server/src/main/java/org/apache/activemq/core/config/ClusterConnectionConfiguration.java
@@ -63,6 +63,8 @@ public final class ClusterConnectionConfiguration implements 
Serializable
 
    private int confirmationWindowSize = 
ActiveMQDefaultConfiguration.getDefaultClusterConfirmationWindowSize();
 
+   private int producerWindowSize = 
ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize();
+
    private boolean allowDirectConnectionsOnly = false;
 
    private int minLargeMessageSize = 
ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
@@ -198,6 +200,18 @@ public final class ClusterConnectionConfiguration 
implements Serializable
       return this;
    }
 
+
+   public int getProducerWindowSize()
+   {
+      return producerWindowSize;
+   }
+
+   public ClusterConnectionConfiguration setProducerindowSize(int 
producerWindowSize)
+   {
+      this.producerWindowSize = producerWindowSize;
+      return this;
+   }
+
    public List<String> getStaticConnectors()
    {
       return staticConnectors;

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/ada112a6/activemq-server/src/main/java/org/apache/activemq/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git 
a/activemq-server/src/main/java/org/apache/activemq/core/deployers/impl/FileConfigurationParser.java
 
b/activemq-server/src/main/java/org/apache/activemq/core/deployers/impl/FileConfigurationParser.java
index 775e60c..3f7c637 100644
--- 
a/activemq-server/src/main/java/org/apache/activemq/core/deployers/impl/FileConfigurationParser.java
+++ 
b/activemq-server/src/main/java/org/apache/activemq/core/deployers/impl/FileConfigurationParser.java
@@ -1417,6 +1417,10 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil
          getInteger(e, "confirmation-window-size", 
ActiveMQDefaultConfiguration.getDefaultClusterConfirmationWindowSize(),
                     Validators.GT_ZERO);
 
+      int producerWindowSize =
+         getInteger(e, "producer-window-size", 
ActiveMQDefaultConfiguration.getDefaultBridgeConfirmationWindowSize(),
+                    Validators.MINUS_ONE_OR_GT_ZERO);
+
       long clusterNotificationInterval = getLong(e, "notification-interval", 
ActiveMQDefaultConfiguration.getDefaultClusterNotificationInterval(), 
Validators.GT_ZERO);
 
       int clusterNotificationAttempts = getInteger(e, "notification-attempts", 
ActiveMQDefaultConfiguration.getDefaultClusterNotificationAttempts(), 
Validators.GT_ZERO);
@@ -1468,6 +1472,7 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil
          .setForwardWhenNoConsumers(forwardWhenNoConsumers)
          .setMaxHops(maxHops)
          .setConfirmationWindowSize(confirmationWindowSize)
+         .setProducerindowSize(producerWindowSize)
          .setAllowDirectConnectionsOnly(allowDirectConnectionsOnly)
          .setClusterNotificationInterval(clusterNotificationInterval)
          .setClusterNotificationAttempts(clusterNotificationAttempts);
@@ -1549,6 +1554,10 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil
          getInteger(brNode, "reconnect-attempts-same-node", 
ActiveMQDefaultConfiguration.getDefaultBridgeConnectSameNode(),
                     Validators.MINUS_ONE_OR_GE_ZERO);
 
+      int producerWindowSize =
+         getInteger(brNode, "producer-window-size", 
ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize(),
+                    Validators.MINUS_ONE_OR_GE_ZERO);
+
       boolean useDuplicateDetection = getBoolean(brNode,
                                                  "use-duplicate-detection",
                                                  
ActiveMQDefaultConfiguration.isDefaultBridgeDuplicateDetection());
@@ -1630,7 +1639,8 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil
          .setConfirmationWindowSize(confirmationWindowSize)
          .setHA(ha)
          .setUser(user)
-         .setPassword(password);
+         .setPassword(password)
+         .setProducerWindowSize(producerWindowSize);
 
       if (!staticConnectorNames.isEmpty())
       {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/ada112a6/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterManager.java
----------------------------------------------------------------------
diff --git 
a/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterManager.java
 
b/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterManager.java
index 24ed7c0..a289e42 100644
--- 
a/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterManager.java
+++ 
b/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterManager.java
@@ -529,8 +529,7 @@ public final class ClusterManager implements 
ActiveMQComponent
       serverLocator.setBlockOnDurableSend(!config.isUseDuplicateDetection());
       
serverLocator.setBlockOnNonDurableSend(!config.isUseDuplicateDetection());
       serverLocator.setMinLargeMessageSize(config.getMinLargeMessageSize());
-      //disable flow control
-      serverLocator.setProducerWindowSize(-1);
+      serverLocator.setProducerWindowSize(config.getProducerWindowSize());
 
       // This will be set to 30s unless it's changed from embedded / testing
       // there is no reason to exception the config for this timeout
@@ -735,6 +734,7 @@ public final class ClusterManager implements 
ActiveMQComponent
                                                        
config.isDuplicateDetection(),
                                                        
config.isForwardWhenNoConsumers(),
                                                        
config.getConfirmationWindowSize(),
+                                                       
config.getProducerWindowSize(),
                                                        executorFactory,
                                                        server,
                                                        postOffice,
@@ -777,6 +777,7 @@ public final class ClusterManager implements 
ActiveMQComponent
                                                        
config.isDuplicateDetection(),
                                                        
config.isForwardWhenNoConsumers(),
                                                        
config.getConfirmationWindowSize(),
+                                                       
config.getProducerWindowSize(),
                                                        executorFactory,
                                                        server,
                                                        postOffice,

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/ada112a6/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/impl/ClusterConnectionImpl.java
----------------------------------------------------------------------
diff --git 
a/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/impl/ClusterConnectionImpl.java
 
b/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/impl/ClusterConnectionImpl.java
index 9ff9948..c02ed57 100644
--- 
a/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/impl/ClusterConnectionImpl.java
+++ 
b/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -113,6 +113,8 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
 
    private final int confirmationWindowSize;
 
+   private final int producerWindowSize;
+
    /**
     * Guard for the field {@link #records}. Note that the field is {@link 
ConcurrentHashMap},
     * however we need the guard to synchronize multiple step operations during 
topology updates.
@@ -179,6 +181,7 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
                                 final boolean useDuplicateDetection,
                                 final boolean routeWhenNoConsumers,
                                 final int confirmationWindowSize,
+                                final int producerWindowSize,
                                 final ExecutorFactory executorFactory,
                                 final ActiveMQServer server,
                                 final PostOffice postOffice,
@@ -220,6 +223,8 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
 
       this.confirmationWindowSize = confirmationWindowSize;
 
+      this.producerWindowSize = producerWindowSize;
+
       this.executorFactory = executorFactory;
 
       this.clusterNotificationInterval = clusterNotificationInterval;
@@ -286,6 +291,7 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
                                 final boolean useDuplicateDetection,
                                 final boolean routeWhenNoConsumers,
                                 final int confirmationWindowSize,
+                                final int producerWindowSize,
                                 final ExecutorFactory executorFactory,
                                 final ActiveMQServer server,
                                 final PostOffice postOffice,
@@ -333,6 +339,8 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
 
       this.confirmationWindowSize = confirmationWindowSize;
 
+      this.producerWindowSize = producerWindowSize;
+
       this.executorFactory = executorFactory;
 
       this.clusterNotificationInterval = clusterNotificationInterval;
@@ -637,8 +645,7 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
          serverLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
          serverLocator.setCallTimeout(callTimeout);
          serverLocator.setCallFailoverTimeout(callFailoverTimeout);
-         // No producer flow control on the bridges, as we don't want to lock 
the queues
-         serverLocator.setProducerWindowSize(-1);
+         serverLocator.setProducerWindowSize(producerWindowSize);
 
          if (retryInterval > 0)
          {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/ada112a6/activemq-server/src/main/resources/schema/activemq-configuration.xsd
----------------------------------------------------------------------
diff --git 
a/activemq-server/src/main/resources/schema/activemq-configuration.xsd 
b/activemq-server/src/main/resources/schema/activemq-configuration.xsd
index 10ab978..31b840c 100644
--- a/activemq-server/src/main/resources/schema/activemq-configuration.xsd
+++ b/activemq-server/src/main/resources/schema/activemq-configuration.xsd
@@ -1,4 +1,4 @@
-<?xml version="1.0" encoding="UTF-8"?>   
+<?xml version="1.0" encoding="UTF-8"?>
 <!--
   Licensed to the Apache Software Foundation (ASF) under one or more
   contributor license agreements.  See the NOTICE file distributed with
@@ -287,7 +287,7 @@
                </xsd:documentation>
             </xsd:annotation>
          </xsd:element>
-         
+
          <xsd:element name="remoting-incoming-interceptors" 
type="class-name-sequenceType" maxOccurs="1" minOccurs="0">
             <xsd:annotation>
                <xsd:documentation>
@@ -1049,6 +1049,14 @@
             </xsd:annotation>
          </xsd:element>
 
+         <xsd:element name="producer-window-size" type="xsd:int" maxOccurs="1" 
minOccurs="0" default="1048576">
+            <xsd:annotation>
+               <xsd:documentation>
+                  Producer flow control on the bridge (default disabled = -1)
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+
          <xsd:element name="user" type="xsd:string" maxOccurs="1" 
minOccurs="0">
             <xsd:annotation>
                <xsd:documentation>
@@ -1231,6 +1239,14 @@
             </xsd:annotation>
          </xsd:element>
 
+         <xsd:element name="producer-window-size" type="xsd:int" maxOccurs="1" 
minOccurs="0" default="1048576">
+            <xsd:annotation>
+               <xsd:documentation>
+                  Producer flow control on the bridge (default disabled = -1)
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+
          <xsd:element name="call-failover-timeout" type="xsd:long" 
default="-1" maxOccurs="1" minOccurs="0">
             <xsd:annotation>
                <xsd:documentation>

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/ada112a6/activemq-server/src/test/java/org/apache/activemq/core/config/impl/FileConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-server/src/test/java/org/apache/activemq/core/config/impl/FileConfigurationTest.java
 
b/activemq-server/src/test/java/org/apache/activemq/core/config/impl/FileConfigurationTest.java
index 6f2a689..eb2731e 100644
--- 
a/activemq-server/src/test/java/org/apache/activemq/core/config/impl/FileConfigurationTest.java
+++ 
b/activemq-server/src/test/java/org/apache/activemq/core/config/impl/FileConfigurationTest.java
@@ -223,6 +223,7 @@ public class FileConfigurationTest extends 
ConfigurationImplTest
             Assert.assertEquals(true, bc.isUseDuplicateDetection());
             Assert.assertEquals("connector1", bc.getStaticConnectors().get(0));
             Assert.assertEquals(null, bc.getDiscoveryGroupName());
+            Assert.assertEquals(444, bc.getProducerWindowSize());
          }
          else
          {
@@ -233,6 +234,7 @@ public class FileConfigurationTest extends 
ConfigurationImplTest
             Assert.assertEquals(null, bc.getTransformerClassName());
             Assert.assertEquals(null, bc.getStaticConnectors());
             Assert.assertEquals("dg1", bc.getDiscoveryGroupName());
+            Assert.assertEquals(555, bc.getProducerWindowSize());
          }
       }
 
@@ -267,6 +269,7 @@ public class FileConfigurationTest extends 
ConfigurationImplTest
             Assert.assertEquals("connector1", 
ccc.getStaticConnectors().get(0));
             Assert.assertEquals("connector2", 
ccc.getStaticConnectors().get(1));
             Assert.assertEquals(null, ccc.getDiscoveryGroupName());
+            Assert.assertEquals(222, ccc.getProducerWindowSize());
          }
          else
          {
@@ -280,6 +283,7 @@ public class FileConfigurationTest extends 
ConfigurationImplTest
             Assert.assertEquals(2, ccc.getMaxHops());
             Assert.assertEquals(Collections.emptyList(), 
ccc.getStaticConnectors());
             Assert.assertEquals("dg1", ccc.getDiscoveryGroupName());
+            Assert.assertEquals(333, ccc.getProducerWindowSize());
          }
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/ada112a6/activemq-server/src/test/resources/ConfigurationTest-full-config.xml
----------------------------------------------------------------------
diff --git 
a/activemq-server/src/test/resources/ConfigurationTest-full-config.xml 
b/activemq-server/src/test/resources/ConfigurationTest-full-config.xml
index b16e11a..e55b522 100644
--- a/activemq-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/activemq-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -142,6 +142,7 @@
              <reconnect-attempts>2</reconnect-attempts>
              <failover-on-server-shutdown>false</failover-on-server-shutdown>
              <use-duplicate-detection>true</use-duplicate-detection>
+            <producer-window-size>444</producer-window-size>
              <static-connectors>
                <connector-ref>connector1</connector-ref>
              </static-connectors>
@@ -149,6 +150,7 @@
          <bridge name="bridge2">
              <queue-name>queue2</queue-name>
              
<forwarding-address>bridge-forwarding-address2</forwarding-address>
+            <producer-window-size>555</producer-window-size>
              <discovery-group-ref discovery-group-name="dg1"/>
          </bridge>
       </bridges>
@@ -167,24 +169,25 @@
       </ha-policy>
       <cluster-connections>
          <cluster-connection name="cluster-connection1">
-         <address>queues1</address>
-         <connector-ref>connector1</connector-ref>
-         <check-period>331</check-period>
-         <connection-ttl>3370</connection-ttl>
-         <min-large-message-size>321</min-large-message-size>
-         <call-timeout>123</call-timeout>
-         <retry-interval>3</retry-interval>
-         <retry-interval-multiplier>0.25</retry-interval-multiplier>
-         <max-retry-interval>10000</max-retry-interval>
-         <reconnect-attempts>72</reconnect-attempts>
-         <use-duplicate-detection>true</use-duplicate-detection>
-         <forward-when-no-consumers>false</forward-when-no-consumers>
-         <max-hops>1</max-hops>
-         <call-failover-timeout>123</call-failover-timeout>
-         <static-connectors>
+            <address>queues1</address>
             <connector-ref>connector1</connector-ref>
-            <connector-ref>connector2</connector-ref>
-         </static-connectors>
+            <check-period>331</check-period>
+            <connection-ttl>3370</connection-ttl>
+            <min-large-message-size>321</min-large-message-size>
+            <call-timeout>123</call-timeout>
+            <retry-interval>3</retry-interval>
+            <retry-interval-multiplier>0.25</retry-interval-multiplier>
+            <max-retry-interval>10000</max-retry-interval>
+            <reconnect-attempts>72</reconnect-attempts>
+            <use-duplicate-detection>true</use-duplicate-detection>
+            <forward-when-no-consumers>false</forward-when-no-consumers>
+            <max-hops>1</max-hops>
+            <producer-window-size>222</producer-window-size>
+            <call-failover-timeout>123</call-failover-timeout>
+            <static-connectors>
+               <connector-ref>connector1</connector-ref>
+               <connector-ref>connector2</connector-ref>
+            </static-connectors>
          </cluster-connection>
          <cluster-connection name="cluster-connection2">
              <address>queues2</address>
@@ -194,6 +197,7 @@
              <use-duplicate-detection>false</use-duplicate-detection>
              <forward-when-no-consumers>true</forward-when-no-consumers>
              <max-hops>2</max-hops>
+            <producer-window-size>333</producer-window-size>
              <call-failover-timeout>456</call-failover-timeout>
              <discovery-group-ref discovery-group-name="dg1"/>
          </cluster-connection>

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/ada112a6/docs/user-manual/en/clusters.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/clusters.md b/docs/user-manual/en/clusters.md
index 6dc8f15..baf9690 100644
--- a/docs/user-manual/en/clusters.md
+++ b/docs/user-manual/en/clusters.md
@@ -736,6 +736,10 @@ specified. The following shows all the available 
configuration options
     server has received `confirmation-window-size` bytes it notifies its
     client, default is 1048576. A value of -1 means no window.
 
+-   `producer-window-size`. The size for producer flow control over cluster 
connection.
+     it's by default disabled through the cluster connection bridge but you 
may want
+     to set a value if you are using really large messages in cluster. A value 
of -1 means no window.
+
 -   `call-failover-timeout`. Similar to `call-timeout` but used when a
     call is made during a failover attempt. Default is -1 (no timeout).
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/ada112a6/docs/user-manual/en/configuration-index.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/configuration-index.md 
b/docs/user-manual/en/configuration-index.md
index 5991446..994c402 100644
--- a/docs/user-manual/en/configuration-index.md
+++ b/docs/user-manual/en/configuration-index.md
@@ -127,6 +127,7 @@ Name | Description
 [reconnect-attempts](core-bridges.md "Chapter 36. Core Bridges")               
  |  maximum number of retry attempts, -1 means 'no limits'. default -1
 [use-duplicate-detection](core-bridges.md "Chapter 36. Core Bridges")          
  |  forward duplicate detection headers?. default true
 [confirmation-window-size](core-bridges.md "Chapter 36. Core Bridges")         
  |  number of bytes before confirmations are sent. default 1MB
+[producer-window-size](core-bridges.md "Chapter 36. Core Bridges")             
  |  Producer flow control size on the bridge. Default -1 (disabled)
 [user](core-bridges.md "Chapter 36. Core Bridges")                             
  |  Username for the bridge, the default is the cluster username
 [password](core-bridges.md "Chapter 36. Core Bridges")                         
  |  Password for the bridge, default is the cluster password
 [reconnect-attempts-same-node](core-bridges.md "Chapter 36. Core Bridges")     
  |  Number of retries before trying another node. default 10
@@ -165,6 +166,7 @@ Name | Description
 [forward-when-no-consumers](clusters.md "Chapter 38. Clusters")                
                              |   should messages be load balanced if there are 
no matching consumers on target? Default=false
 [max-hops](clusters.md "Chapter 38. Clusters")                                 
                              |   maximum number of hops cluster topology is 
propagated. Default=1
 [confirmation-window-size](client-reconnection.md "Chapter 34. Client 
Reconnection and Session Reattachment")|   The size (in bytes) of the window 
used for confirming data from the server connected to. Default 1048576
+[producer-window-size](clusters.md "Chapter 38. Clusters")                     
                              |   Flow Control for the Cluster connection 
bridge. Default -1 (disabled)
 [call-failover-timeout](clusters.md "38.3.1. Configuring Cluster Connections") 
                              |   How long to wait for a reply if in the middle 
of a fail-over. -1 means wait forever. Default -1
 [notification-interval](clusters.md "Chapter 38. Clusters")                    
                              |   how often the cluster connection will notify 
the cluster of its existence right after joining the cluster. Default 1000
 [notification-attempts](clusters.md "Chapter 38. Clusters")                    
                              |   how many times this cluster connection will 
notify the cluster of its existence right after joining the cluster Default 2

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/ada112a6/docs/user-manual/en/core-bridges.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/core-bridges.md 
b/docs/user-manual/en/core-bridges.md
index 2965d3f..3bf4f9d 100644
--- a/docs/user-manual/en/core-bridges.md
+++ b/docs/user-manual/en/core-bridges.md
@@ -195,6 +195,12 @@ Let's take a look at all the parameters in turn:
     > `confirmation-window-size` is less than or equal to
     > `max-size-bytes` to prevent the flow of messages from ceasing.
 
+-   `producer-window-size`. This optional parameter determines the
+    producer flow control through the bridge. You usually leave this off
+    unless you are dealing with huge large messages. 
+    
+    Default=-1 (disabled)
+
 -   `user`. This optional parameter determines the user name to use when
     creating the bridge connection to the remote server. If it is not
     specified the default cluster user specified by `cluster-user` in

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/ada112a6/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/bridge/BridgeTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/bridge/BridgeTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/bridge/BridgeTest.java
index ff4adf3..b466c3a 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/bridge/BridgeTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/bridge/BridgeTest.java
@@ -1795,7 +1795,7 @@ public class BridgeTest extends ServiceTestBase
          ArrayList<String> staticConnectors = new ArrayList<String>();
          staticConnectors.add(server1tc.getName());
 
-         int minLargeMessageSize = 50 * 1024 * 1024; //50M
+         int minLargeMessageSize = 1024 * 1024;
 
          BridgeConfiguration bridgeConfiguration = new BridgeConfiguration()
                  .setName("bridge1")
@@ -1806,7 +1806,8 @@ public class BridgeTest extends ServiceTestBase
                  .setUseDuplicateDetection(false)
                  .setConfirmationWindowSize(1024)
                  .setStaticConnectors(staticConnectors)
-                 .setMinLargeMessageSize(minLargeMessageSize);
+                 .setMinLargeMessageSize(minLargeMessageSize)
+                 .setProducerWindowSize(minLargeMessageSize / 2);
 
          List<BridgeConfiguration> bridgeConfigs = new 
ArrayList<BridgeConfiguration>();
          bridgeConfigs.add(bridgeConfiguration);
@@ -1847,10 +1848,9 @@ public class BridgeTest extends ServiceTestBase
          session1.start();
 
          //create a large message bigger than Integer.MAX_VALUE
-         final long largeMessageSize = 3L * 1024L * 1024L * 1024L;
+         final long largeMessageSize = Integer.MAX_VALUE + 1000L;
 
-         File destDir = createDestDir("testBridgeWithVeryLargeMessage");
-         ClientMessage largeMessage = createLargeMessage(session0, 
largeMessageSize, destDir);
+         ClientMessage largeMessage = createLargeMessage(session0, 
largeMessageSize);
 
          producer0.send(largeMessage);
 
@@ -1878,7 +1878,7 @@ public class BridgeTest extends ServiceTestBase
          ClientMessage message = consumer1.receive(5000);
          message.acknowledge();
 
-         File outputFile = new File(destDir, "huge_message_received.dat");
+         File outputFile = new File(getTemporaryDir(), 
"huge_message_received.dat");
 
          System.out.println("-----message save to: " + 
outputFile.getAbsolutePath());
          FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
@@ -1930,30 +1930,10 @@ public class BridgeTest extends ServiceTestBase
       assertEquals(0, loadQueues(server0).size());
    }
 
-   private File createDestDir(String dirName)
-   {
-      File clientDir = new File(getClientLargeMessagesDir());
-      if (!clientDir.exists())
-      {
-         if (!clientDir.mkdirs())
-         {
-            throw new IllegalStateException("Can't create dir " + 
clientDir.getAbsolutePath());
-         }
-      }
-
-      File destDir = new File(clientDir, dirName);
-      if (!destDir.mkdir())
-      {
-         throw new IllegalStateException("Can't create dir " + 
destDir.getAbsolutePath());
-      }
-      return destDir;
-   }
-
-
-   private ClientMessage createLargeMessage(ClientSession session, long 
largeMessageSize, File destDir) throws Exception
+   private ClientMessage createLargeMessage(ClientSession session, long 
largeMessageSize) throws Exception
    {
 
-      File fileInput = new File(destDir, "huge_message_to_send.dat");
+      File fileInput = new File(getTemporaryDir(), "huge_message_to_send.dat");
 
       createFile(fileInput, largeMessageSize);
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/ada112a6/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/util/BackupSyncDelay.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/util/BackupSyncDelay.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/util/BackupSyncDelay.java
index 95506d5..710c4d3 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/util/BackupSyncDelay.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/util/BackupSyncDelay.java
@@ -376,12 +376,6 @@ public class BackupSyncDelay implements Interceptor
       }
 
       @Override
-      public boolean largeServerCheck(long timeout)
-      {
-         return true;
-      }
-
-      @Override
       public boolean supports(byte packetID)
       {
          return true;

Reply via email to