Repository: activemq-artemis
Updated Branches:
  refs/heads/master 425fe8675 -> 7d69d913e


ARTEMIS-569 fix bridge producerWindowSize

Something bizarre happened with commit
8f52a622d0d883ca5e9f60ba7754ed51de38cc5c in April 2015. It reverted the
changes from both c1111cc156684b15938ab3f8e34df9f4b64f57c4 and
ada112a6a37dce8ddf48e2238904421b2ca8e0dc. This commit fixes that.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ce9ea176
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ce9ea176
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ce9ea176

Branch: refs/heads/master
Commit: ce9ea1760a473531f8175c9ca3c72f03fd8c617d
Parents: 425fe86
Author: jbertram <[email protected]>
Authored: Fri Jun 24 16:50:25 2016 -0500
Committer: jbertram <[email protected]>
Committed: Fri Jun 24 16:54:48 2016 -0500

----------------------------------------------------------------------
 .../config/ActiveMQDefaultConfiguration.java    |  10 +
 .../core/management/ActiveMQServerControl.java  |   1 +
 .../core/config/BridgeConfiguration.java        |  18 ++
 .../config/ClusterConnectionConfiguration.java  | 290 +++++++++++--------
 .../deployers/impl/FileConfigurationParser.java |  49 +++-
 .../impl/ActiveMQServerControlImpl.java         |  19 +-
 .../core/server/cluster/ClusterManager.java     |   7 +-
 .../cluster/impl/ClusterConnectionImpl.java     |  11 +-
 .../resources/schema/artemis-configuration.xsd  |  16 +
 .../core/config/impl/FileConfigurationTest.java |   4 +
 .../resources/ConfigurationTest-full-config.xml |   4 +
 .../integration/cluster/bridge/BridgeTest.java  | 195 +++++++++++++
 .../management/ActiveMQServerControlTest.java   |   1 +
 13 files changed, 492 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce9ea176/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 8fe6e40..1239b0b 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -301,6 +301,9 @@ public final class ActiveMQDefaultConfiguration {
    // Once the bridge has received this many bytes, it sends a confirmation
    private static int DEFAULT_BRIDGE_CONFIRMATION_WINDOW_SIZE = 1048576;
 
+   // Producer flow control
+   private static int DEFAULT_BRIDGE_PRODUCER_WINDOW_SIZE = -1;
+
    // Upon reconnection this configures the number of time the same node on 
the topology will be retried before reseting the server locator and using the 
initial connectors
    private static int DEFAULT_BRIDGE_CONNECT_SAME_NODE = 10;
 
@@ -841,6 +844,13 @@ public final class ActiveMQDefaultConfiguration {
    }
 
    /**
+    * Producer flow control
+    */
+   public static int getDefaultBridgeProducerWindowSize() {
+      return DEFAULT_BRIDGE_PRODUCER_WINDOW_SIZE;
+   }
+
+   /**
     * Upon reconnection this configures the number of time the same node on 
the topology will be retried before reseting the server locator and using the 
initial connectors
     */
    public static int getDefaultBridgeConnectSameNode() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce9ea176/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index 677bfb6..de9bc9f 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -698,6 +698,7 @@ public interface ActiveMQServerControl {
                      @Parameter(name = "reconnectAttempts", desc = "Number of 
reconnection attempts") int reconnectAttempts,
                      @Parameter(name = "useDuplicateDetection", desc = "Use 
duplicate detection") boolean useDuplicateDetection,
                      @Parameter(name = "confirmationWindowSize", desc = 
"Confirmation window size") int confirmationWindowSize,
+                     @Parameter(name = "producerWindowSize", desc = "Producer 
window size") int producerWindowSize,
                      @Parameter(name = "clientFailureCheckPeriod", desc = 
"Period to check client failure") long clientFailureCheckPeriod,
                      @Parameter(name = "staticConnectorNames", desc = "comma 
separated list of connector names or name of discovery group if 
'useDiscoveryGroup' is set to true") String connectorNames,
                      @Parameter(name = "useDiscoveryGroup", desc = "use 
discovery  group") boolean useDiscoveryGroup,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce9ea176/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
index 0ce0ce8..f07fc17 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
@@ -56,6 +56,9 @@ public final class BridgeConfiguration implements 
Serializable {
 
    private int confirmationWindowSize = 
ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
 
+   // disable flow control
+   private int producerWindowSize = 
ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize();
+
    private long clientFailureCheckPeriod = 
ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
 
    private String user = ActiveMQDefaultConfiguration.getDefaultClusterUser();
@@ -267,6 +270,18 @@ public final class BridgeConfiguration implements 
Serializable {
       return this;
    }
 
+   public int getProducerWindowSize() {
+      return producerWindowSize;
+   }
+
+   /**
+    * @param producerWindowSize the producerWindowSize to set
+    */
+   public BridgeConfiguration setProducerWindowSize(final int 
producerWindowSize) {
+      this.producerWindowSize = producerWindowSize;
+      return this;
+   }
+
    public long getClientFailureCheckPeriod() {
       return clientFailureCheckPeriod;
    }
@@ -340,6 +355,7 @@ public final class BridgeConfiguration implements 
Serializable {
       result = prime * result + (int) (callTimeout ^ (callTimeout >>> 32));
       result = prime * result + (int) (clientFailureCheckPeriod ^ 
(clientFailureCheckPeriod >>> 32));
       result = prime * result + confirmationWindowSize;
+      result = prime * result + producerWindowSize;
       result = prime * result + (int) (connectionTTL ^ (connectionTTL >>> 32));
       result = prime * result + ((discoveryGroupName == null) ? 0 : 
discoveryGroupName.hashCode());
       result = prime * result + ((filterString == null) ? 0 : 
filterString.hashCode());
@@ -378,6 +394,8 @@ public final class BridgeConfiguration implements 
Serializable {
          return false;
       if (confirmationWindowSize != other.confirmationWindowSize)
          return false;
+      if (producerWindowSize != other.producerWindowSize)
+         return false;
       if (connectionTTL != other.connectionTTL)
          return false;
       if (discoveryGroupName == null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce9ea176/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ClusterConnectionConfiguration.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ClusterConnectionConfiguration.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ClusterConnectionConfiguration.java
index 3c715af..2fecc6f 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ClusterConnectionConfiguration.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ClusterConnectionConfiguration.java
@@ -62,7 +62,8 @@ public final class ClusterConnectionConfiguration implements 
Serializable {
 
    private boolean duplicateDetection = 
ActiveMQDefaultConfiguration.isDefaultClusterDuplicateDetection();
 
-   private MessageLoadBalancingType messageLoadBalancingType = 
Enum.valueOf(MessageLoadBalancingType.class, 
ActiveMQDefaultConfiguration.getDefaultClusterMessageLoadBalancingType());
+   private MessageLoadBalancingType messageLoadBalancingType = 
Enum.valueOf(MessageLoadBalancingType.class, ActiveMQDefaultConfiguration
+      .getDefaultClusterMessageLoadBalancingType());
 
    private URISupport.CompositeData compositeMembers;
 
@@ -74,6 +75,8 @@ public final class ClusterConnectionConfiguration implements 
Serializable {
 
    private int confirmationWindowSize = 
ActiveMQDefaultConfiguration.getDefaultClusterConfirmationWindowSize();
 
+   private int producerWindowSize = 
ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize();
+
    private boolean allowDirectConnectionsOnly = false;
 
    private int minLargeMessageSize = 
ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
@@ -108,15 +111,15 @@ public final class ClusterConnectionConfiguration 
implements Serializable {
       return this;
    }
 
+   public URISupport.CompositeData getCompositeMembers() {
+      return compositeMembers;
+   }
+
    public ClusterConnectionConfiguration 
setCompositeMembers(URISupport.CompositeData members) {
       this.compositeMembers = members;
       return this;
    }
 
-   public URISupport.CompositeData getCompositeMembers() {
-      return compositeMembers;
-   }
-
    /**
     * @return the clientFailureCheckPeriod
     */
@@ -125,6 +128,14 @@ public final class ClusterConnectionConfiguration 
implements Serializable {
    }
 
    /**
+    * @param clientFailureCheckPeriod the clientFailureCheckPeriod to set
+    */
+   public ClusterConnectionConfiguration setClientFailureCheckPeriod(long 
clientFailureCheckPeriod) {
+      this.clientFailureCheckPeriod = clientFailureCheckPeriod;
+      return this;
+   }
+
+   /**
     * @return the connectionTTL
     */
    public long getConnectionTTL() {
@@ -132,6 +143,14 @@ public final class ClusterConnectionConfiguration 
implements Serializable {
    }
 
    /**
+    * @param connectionTTL the connectionTTL to set
+    */
+   public ClusterConnectionConfiguration setConnectionTTL(long connectionTTL) {
+      this.connectionTTL = connectionTTL;
+      return this;
+   }
+
+   /**
     * @return the retryIntervalMultiplier
     */
    public double getRetryIntervalMultiplier() {
@@ -139,6 +158,14 @@ public final class ClusterConnectionConfiguration 
implements Serializable {
    }
 
    /**
+    * @param retryIntervalMultiplier the retryIntervalMultiplier to set
+    */
+   public ClusterConnectionConfiguration setRetryIntervalMultiplier(double 
retryIntervalMultiplier) {
+      this.retryIntervalMultiplier = retryIntervalMultiplier;
+      return this;
+   }
+
+   /**
     * @return the maxRetryInterval
     */
    public long getMaxRetryInterval() {
@@ -146,6 +173,14 @@ public final class ClusterConnectionConfiguration 
implements Serializable {
    }
 
    /**
+    * @param maxRetryInterval the maxRetryInterval to set
+    */
+   public ClusterConnectionConfiguration setMaxRetryInterval(long 
maxRetryInterval) {
+      this.maxRetryInterval = maxRetryInterval;
+      return this;
+   }
+
+   /**
     * @return the initialConnectAttempts
     */
    public int getInitialConnectAttempts() {
@@ -153,20 +188,52 @@ public final class ClusterConnectionConfiguration 
implements Serializable {
    }
 
    /**
+    * @param initialConnectAttempts the reconnectAttempts to set
+    */
+   public ClusterConnectionConfiguration setInitialConnectAttempts(int 
initialConnectAttempts) {
+      this.initialConnectAttempts = initialConnectAttempts;
+      return this;
+   }
+
+   /**
     * @return the reconnectAttempts
     */
    public int getReconnectAttempts() {
       return reconnectAttempts;
    }
 
+   /**
+    * @param reconnectAttempts the reconnectAttempts to set
+    */
+   public ClusterConnectionConfiguration setReconnectAttempts(int 
reconnectAttempts) {
+      this.reconnectAttempts = reconnectAttempts;
+      return this;
+   }
+
    public long getCallTimeout() {
       return callTimeout;
    }
 
+   /**
+    * @param callTimeout the callTimeout to set
+    */
+   public ClusterConnectionConfiguration setCallTimeout(long callTimeout) {
+      this.callTimeout = callTimeout;
+      return this;
+   }
+
    public long getCallFailoverTimeout() {
       return callFailoverTimeout;
    }
 
+   /**
+    * @param callFailoverTimeout the callTimeout to set
+    */
+   public ClusterConnectionConfiguration setCallFailoverTimeout(long 
callFailoverTimeout) {
+      this.callFailoverTimeout = callFailoverTimeout;
+      return this;
+   }
+
    public String getConnectorName() {
       return connectorName;
    }
@@ -180,10 +247,27 @@ public final class ClusterConnectionConfiguration 
implements Serializable {
       return duplicateDetection;
    }
 
+   /**
+    * @param duplicateDetection the duplicateDetection to set
+    */
+   public ClusterConnectionConfiguration setDuplicateDetection(boolean 
duplicateDetection) {
+      this.duplicateDetection = duplicateDetection;
+      return this;
+   }
+
    public MessageLoadBalancingType getMessageLoadBalancingType() {
       return messageLoadBalancingType;
    }
 
+   /**
+    * @param messageLoadBalancingType
+    * @return
+    */
+   public ClusterConnectionConfiguration 
setMessageLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType) {
+      this.messageLoadBalancingType = messageLoadBalancingType;
+      return this;
+   }
+
    public int getMaxHops() {
       return maxHops;
    }
@@ -202,6 +286,15 @@ public final class ClusterConnectionConfiguration 
implements Serializable {
       return this;
    }
 
+   public int getProducerWindowSize() {
+      return producerWindowSize;
+   }
+
+   public ClusterConnectionConfiguration setProducerindowSize(int 
producerWindowSize) {
+      this.producerWindowSize = producerWindowSize;
+      return this;
+   }
+
    public List<String> getStaticConnectors() {
       return staticConnectors;
    }
@@ -224,6 +317,14 @@ public final class ClusterConnectionConfiguration 
implements Serializable {
       return retryInterval;
    }
 
+   /**
+    * @param retryInterval the retryInterval to set
+    */
+   public ClusterConnectionConfiguration setRetryInterval(long retryInterval) {
+      this.retryInterval = retryInterval;
+      return this;
+   }
+
    public boolean isAllowDirectConnectionsOnly() {
       return allowDirectConnectionsOnly;
    }
@@ -248,95 +349,6 @@ public final class ClusterConnectionConfiguration 
implements Serializable {
       return this;
    }
 
-   /**
-    * @param clientFailureCheckPeriod the clientFailureCheckPeriod to set
-    */
-   public ClusterConnectionConfiguration setClientFailureCheckPeriod(long 
clientFailureCheckPeriod) {
-      this.clientFailureCheckPeriod = clientFailureCheckPeriod;
-      return this;
-   }
-
-   /**
-    * @param connectionTTL the connectionTTL to set
-    */
-   public ClusterConnectionConfiguration setConnectionTTL(long connectionTTL) {
-      this.connectionTTL = connectionTTL;
-      return this;
-   }
-
-   /**
-    * @param retryInterval the retryInterval to set
-    */
-   public ClusterConnectionConfiguration setRetryInterval(long retryInterval) {
-      this.retryInterval = retryInterval;
-      return this;
-   }
-
-   /**
-    * @param retryIntervalMultiplier the retryIntervalMultiplier to set
-    */
-   public ClusterConnectionConfiguration setRetryIntervalMultiplier(double 
retryIntervalMultiplier) {
-      this.retryIntervalMultiplier = retryIntervalMultiplier;
-      return this;
-   }
-
-   /**
-    * @param maxRetryInterval the maxRetryInterval to set
-    */
-   public ClusterConnectionConfiguration setMaxRetryInterval(long 
maxRetryInterval) {
-      this.maxRetryInterval = maxRetryInterval;
-      return this;
-   }
-
-   /**
-    * @param initialConnectAttempts the reconnectAttempts to set
-    */
-   public ClusterConnectionConfiguration setInitialConnectAttempts(int 
initialConnectAttempts) {
-      this.initialConnectAttempts = initialConnectAttempts;
-      return this;
-   }
-
-   /**
-    * @param reconnectAttempts the reconnectAttempts to set
-    */
-   public ClusterConnectionConfiguration setReconnectAttempts(int 
reconnectAttempts) {
-      this.reconnectAttempts = reconnectAttempts;
-      return this;
-   }
-
-   /**
-    * @param callTimeout the callTimeout to set
-    */
-   public ClusterConnectionConfiguration setCallTimeout(long callTimeout) {
-      this.callTimeout = callTimeout;
-      return this;
-   }
-
-   /**
-    * @param callFailoverTimeout the callTimeout to set
-    */
-   public ClusterConnectionConfiguration setCallFailoverTimeout(long 
callFailoverTimeout) {
-      this.callFailoverTimeout = callFailoverTimeout;
-      return this;
-   }
-
-   /**
-    * @param duplicateDetection the duplicateDetection to set
-    */
-   public ClusterConnectionConfiguration setDuplicateDetection(boolean 
duplicateDetection) {
-      this.duplicateDetection = duplicateDetection;
-      return this;
-   }
-
-   /**
-    * @param messageLoadBalancingType
-    * @return
-    */
-   public ClusterConnectionConfiguration 
setMessageLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType) {
-      this.messageLoadBalancingType = messageLoadBalancingType;
-      return this;
-   }
-
    /*
    * returns the cluster update interval
    * */
@@ -457,77 +469,107 @@ public final class ClusterConnectionConfiguration 
implements Serializable {
 
    @Override
    public boolean equals(Object obj) {
-      if (this == obj)
+      if (this == obj) {
          return true;
-      if (obj == null)
+      }
+      if (obj == null) {
          return false;
-      if (getClass() != obj.getClass())
+      }
+      if (getClass() != obj.getClass()) {
          return false;
+      }
       ClusterConnectionConfiguration other = (ClusterConnectionConfiguration) 
obj;
       if (address == null) {
-         if (other.address != null)
+         if (other.address != null) {
             return false;
+         }
       }
-      else if (!address.equals(other.address))
+      else if (!address.equals(other.address)) {
          return false;
-      if (allowDirectConnectionsOnly != other.allowDirectConnectionsOnly)
+      }
+      if (allowDirectConnectionsOnly != other.allowDirectConnectionsOnly) {
          return false;
-      if (callFailoverTimeout != other.callFailoverTimeout)
+      }
+      if (callFailoverTimeout != other.callFailoverTimeout) {
          return false;
-      if (callTimeout != other.callTimeout)
+      }
+      if (callTimeout != other.callTimeout) {
          return false;
-      if (clientFailureCheckPeriod != other.clientFailureCheckPeriod)
+      }
+      if (clientFailureCheckPeriod != other.clientFailureCheckPeriod) {
          return false;
-      if (clusterNotificationAttempts != other.clusterNotificationAttempts)
+      }
+      if (clusterNotificationAttempts != other.clusterNotificationAttempts) {
          return false;
-      if (clusterNotificationInterval != other.clusterNotificationInterval)
+      }
+      if (clusterNotificationInterval != other.clusterNotificationInterval) {
          return false;
-      if (confirmationWindowSize != other.confirmationWindowSize)
+      }
+      if (confirmationWindowSize != other.confirmationWindowSize) {
          return false;
-      if (connectionTTL != other.connectionTTL)
+      }
+      if (connectionTTL != other.connectionTTL) {
          return false;
+      }
       if (connectorName == null) {
-         if (other.connectorName != null)
+         if (other.connectorName != null) {
             return false;
+         }
       }
-      else if (!connectorName.equals(other.connectorName))
+      else if (!connectorName.equals(other.connectorName)) {
          return false;
+      }
       if (discoveryGroupName == null) {
-         if (other.discoveryGroupName != null)
+         if (other.discoveryGroupName != null) {
             return false;
+         }
       }
-      else if (!discoveryGroupName.equals(other.discoveryGroupName))
+      else if (!discoveryGroupName.equals(other.discoveryGroupName)) {
          return false;
-      if (duplicateDetection != other.duplicateDetection)
+      }
+      if (duplicateDetection != other.duplicateDetection) {
          return false;
-      if (messageLoadBalancingType != other.messageLoadBalancingType)
+      }
+      if (messageLoadBalancingType != other.messageLoadBalancingType) {
          return false;
-      if (maxHops != other.maxHops)
+      }
+      if (maxHops != other.maxHops) {
          return false;
-      if (maxRetryInterval != other.maxRetryInterval)
+      }
+      if (maxRetryInterval != other.maxRetryInterval) {
          return false;
-      if (minLargeMessageSize != other.minLargeMessageSize)
+      }
+      if (minLargeMessageSize != other.minLargeMessageSize) {
          return false;
+      }
       if (name == null) {
-         if (other.name != null)
+         if (other.name != null) {
             return false;
+         }
       }
-      else if (!name.equals(other.name))
+      else if (!name.equals(other.name)) {
          return false;
-      if (initialConnectAttempts != other.initialConnectAttempts)
+      }
+      if (initialConnectAttempts != other.initialConnectAttempts) {
          return false;
-      if (reconnectAttempts != other.reconnectAttempts)
+      }
+      if (reconnectAttempts != other.reconnectAttempts) {
          return false;
-      if (retryInterval != other.retryInterval)
+      }
+      if (retryInterval != other.retryInterval) {
          return false;
-      if (Double.doubleToLongBits(retryIntervalMultiplier) != 
Double.doubleToLongBits(other.retryIntervalMultiplier))
+      }
+      if (Double.doubleToLongBits(retryIntervalMultiplier) != 
Double.doubleToLongBits(other.retryIntervalMultiplier)) {
          return false;
+      }
       if (staticConnectors == null) {
-         if (other.staticConnectors != null)
+         if (other.staticConnectors != null) {
             return false;
+         }
       }
-      else if (!staticConnectors.equals(other.staticConnectors))
+      else if (!staticConnectors.equals(other.staticConnectors)) {
          return false;
+      }
       return true;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce9ea176/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index b6e1c7b..ca4bb52 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -1314,6 +1314,8 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
 
       int confirmationWindowSize = getInteger(e, "confirmation-window-size", 
ActiveMQDefaultConfiguration.getDefaultClusterConfirmationWindowSize(), 
Validators.GT_ZERO);
 
+      int producerWindowSize = getInteger(e, "producer-window-size", 
ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize(), 
Validators.MINUS_ONE_OR_GT_ZERO);
+
       long clusterNotificationInterval = getLong(e, "notification-interval", 
ActiveMQDefaultConfiguration.getDefaultClusterNotificationInterval(), 
Validators.GT_ZERO);
 
       int clusterNotificationAttempts = getInteger(e, "notification-attempts", 
ActiveMQDefaultConfiguration.getDefaultClusterNotificationAttempts(), 
Validators.GT_ZERO);
@@ -1343,7 +1345,28 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
          }
       }
 
-      ClusterConnectionConfiguration config = new 
ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorName).setMinLargeMessageSize(minLargeMessageSize).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setMaxRetryInterval(maxRetryInterval).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setCallTimeout(callTimeout).setCallFailoverTimeout(callFailoverTimeout).setDuplicateDetection(duplicateDetection).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(confirmationWindowSize).setAllowDirectConnectionsOnly(allowDirectConnectionsOnly).setClusterNotificationInterval(clusterNotificationInterval).setClusterNotificationAttempts(clusterNotificationAttempts);
+      ClusterConnectionConfiguration config = new 
ClusterConnectionConfiguration()
+         .setName(name)
+         .setAddress(address)
+         .setConnectorName(connectorName)
+         .setMinLargeMessageSize(minLargeMessageSize)
+         .setClientFailureCheckPeriod(clientFailureCheckPeriod)
+         .setConnectionTTL(connectionTTL)
+         .setRetryInterval(retryInterval)
+         .setRetryIntervalMultiplier(retryIntervalMultiplier)
+         .setMaxRetryInterval(maxRetryInterval)
+         .setInitialConnectAttempts(initialConnectAttempts)
+         .setReconnectAttempts(reconnectAttempts)
+         .setCallTimeout(callTimeout)
+         .setCallFailoverTimeout(callFailoverTimeout)
+         .setDuplicateDetection(duplicateDetection)
+         .setMessageLoadBalancingType(messageLoadBalancingType)
+         .setMaxHops(maxHops)
+         .setConfirmationWindowSize(confirmationWindowSize)
+         .setProducerindowSize(producerWindowSize)
+         .setAllowDirectConnectionsOnly(allowDirectConnectionsOnly)
+         .setClusterNotificationInterval(clusterNotificationInterval)
+         .setClusterNotificationAttempts(clusterNotificationAttempts);
 
       if (discoveryGroupName == null) {
          config.setStaticConnectors(staticConnectorNames);
@@ -1377,6 +1400,8 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
       // Default bridge conf
       int confirmationWindowSize = getInteger(brNode, 
"confirmation-window-size", 
ActiveMQDefaultConfiguration.getDefaultBridgeConfirmationWindowSize(), 
Validators.GT_ZERO);
 
+      int producerWindowSize = getInteger(brNode, "producer-window-size", 
ActiveMQDefaultConfiguration.getDefaultBridgeConfirmationWindowSize(), 
Validators.GT_ZERO);
+
       long retryInterval = getLong(brNode, "retry-interval", 
ActiveMQClient.DEFAULT_RETRY_INTERVAL, Validators.GT_ZERO);
 
       long clientFailureCheckPeriod = getLong(brNode, "check-period", 
ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, Validators.GT_ZERO);
@@ -1444,7 +1469,27 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
          }
       }
 
-      BridgeConfiguration config = new 
BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerClassName(transformerClassName).setMinLargeMessageSize(minLargeMessageSize).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setRetryInterval(retryInterval).setMaxRetryInterval(maxRetryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setReconnectAttemptsOnSameNode(reconnectAttemptsSameNode).setUseDuplicateDetection(useDuplicateDetection).setConfirmationWindowSize(confirmationWindowSize).setHA(ha).setUser(user).setPassword(password);
+      BridgeConfiguration config = new BridgeConfiguration()
+         .setName(name)
+         .setQueueName(queueName)
+         .setForwardingAddress(forwardingAddress)
+         .setFilterString(filterString)
+         .setTransformerClassName(transformerClassName)
+         .setMinLargeMessageSize(minLargeMessageSize)
+         .setClientFailureCheckPeriod(clientFailureCheckPeriod)
+         .setConnectionTTL(connectionTTL)
+         .setRetryInterval(retryInterval)
+         .setMaxRetryInterval(maxRetryInterval)
+         .setRetryIntervalMultiplier(retryIntervalMultiplier)
+         .setInitialConnectAttempts(initialConnectAttempts)
+         .setReconnectAttempts(reconnectAttempts)
+         .setReconnectAttemptsOnSameNode(reconnectAttemptsSameNode)
+         .setUseDuplicateDetection(useDuplicateDetection)
+         .setConfirmationWindowSize(confirmationWindowSize)
+         .setProducerWindowSize(producerWindowSize)
+         .setHA(ha)
+         .setUser(user)
+         .setPassword(password);
 
       if (!staticConnectorNames.isEmpty()) {
          config.setStaticConnectors(staticConnectorNames);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce9ea176/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 4c47e74..8202f14 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -1722,6 +1722,7 @@ public class ActiveMQServerControlImpl extends 
AbstractControl implements Active
                             final int reconnectAttempts,
                             final boolean useDuplicateDetection,
                             final int confirmationWindowSize,
+                            final int producerWindowSize,
                             final long clientFailureCheckPeriod,
                             final String staticConnectorsOrDiscoveryGroup,
                             boolean useDiscoveryGroup,
@@ -1733,7 +1734,23 @@ public class ActiveMQServerControlImpl extends 
AbstractControl implements Active
       clearIO();
 
       try {
-         BridgeConfiguration config = new 
BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerClassName(transformerClassName).setClientFailureCheckPeriod(clientFailureCheckPeriod).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setUseDuplicateDetection(useDuplicateDetection).setConfirmationWindowSize(confirmationWindowSize).setHA(ha).setUser(user).setPassword(password);
+         BridgeConfiguration config = new BridgeConfiguration()
+            .setName(name)
+            .setQueueName(queueName)
+            .setForwardingAddress(forwardingAddress)
+            .setFilterString(filterString)
+            .setTransformerClassName(transformerClassName)
+            .setClientFailureCheckPeriod(clientFailureCheckPeriod)
+            .setRetryInterval(retryInterval)
+            .setRetryIntervalMultiplier(retryIntervalMultiplier)
+            .setInitialConnectAttempts(initialConnectAttempts)
+            .setReconnectAttempts(reconnectAttempts)
+            .setUseDuplicateDetection(useDuplicateDetection)
+            .setConfirmationWindowSize(confirmationWindowSize)
+            .setProducerWindowSize(producerWindowSize)
+            .setHA(ha)
+            .setUser(user)
+            .setPassword(password);
 
          if (useDiscoveryGroup) {
             config.setDiscoveryGroupName(staticConnectorsOrDiscoveryGroup);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce9ea176/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
index 5f3c44b..ef39ba2 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
@@ -465,8 +465,7 @@ public final class ClusterManager implements 
ActiveMQComponent {
       serverLocator.setBlockOnDurableSend(!config.isUseDuplicateDetection());
       
serverLocator.setBlockOnNonDurableSend(!config.isUseDuplicateDetection());
       serverLocator.setMinLargeMessageSize(config.getMinLargeMessageSize());
-      //disable flow control
-      serverLocator.setProducerWindowSize(-1);
+      serverLocator.setProducerWindowSize(config.getProducerWindowSize());
 
       // This will be set to 30s unless it's changed from embedded / testing
       // there is no reason to exception the config for this timeout
@@ -615,7 +614,7 @@ public final class ClusterManager implements 
ActiveMQComponent {
                                                  dg);
          }
 
-         clusterConnection = new ClusterConnectionImpl(this, dg, connector, 
new SimpleString(config.getName()), new SimpleString(config.getAddress()), 
config.getMinLargeMessageSize(), config.getClientFailureCheckPeriod(), 
config.getConnectionTTL(), config.getRetryInterval(), 
config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), 
config.getInitialConnectAttempts(), config.getReconnectAttempts(), 
config.getCallTimeout(), config.getCallFailoverTimeout(), 
config.isDuplicateDetection(), config.getMessageLoadBalancingType(), 
config.getConfirmationWindowSize(), executorFactory, server, postOffice, 
managementService, scheduledExecutor, config.getMaxHops(), nodeManager, 
server.getConfiguration().getClusterUser(), 
server.getConfiguration().getClusterPassword(), 
config.isAllowDirectConnectionsOnly(), config.getClusterNotificationInterval(), 
config.getClusterNotificationAttempts());
+         clusterConnection = new ClusterConnectionImpl(this, dg, connector, 
new SimpleString(config.getName()), new SimpleString(config.getAddress()), 
config.getMinLargeMessageSize(), config.getClientFailureCheckPeriod(), 
config.getConnectionTTL(), config.getRetryInterval(), 
config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), 
config.getInitialConnectAttempts(), config.getReconnectAttempts(), 
config.getCallTimeout(), config.getCallFailoverTimeout(), 
config.isDuplicateDetection(), config.getMessageLoadBalancingType(), 
config.getConfirmationWindowSize(), config.getProducerWindowSize(), 
executorFactory, server, postOffice, managementService, scheduledExecutor, 
config.getMaxHops(), nodeManager, server.getConfiguration().getClusterUser(), 
server.getConfiguration().getClusterPassword(), 
config.isAllowDirectConnectionsOnly(), config.getClusterNotificationInterval(), 
config.getClusterNotificationAttempts());
 
          clusterController.addClusterConnection(clusterConnection.getName(), 
dg, config);
       }
@@ -626,7 +625,7 @@ public final class ClusterManager implements 
ActiveMQComponent {
             logger.debug(this + " defining cluster connection towards " + 
Arrays.toString(tcConfigs));
          }
 
-         clusterConnection = new ClusterConnectionImpl(this, tcConfigs, 
connector, new SimpleString(config.getName()), new 
SimpleString(config.getAddress()), config.getMinLargeMessageSize(), 
config.getClientFailureCheckPeriod(), config.getConnectionTTL(), 
config.getRetryInterval(), config.getRetryIntervalMultiplier(), 
config.getMaxRetryInterval(), config.getInitialConnectAttempts(), 
config.getReconnectAttempts(), config.getCallTimeout(), 
config.getCallFailoverTimeout(), config.isDuplicateDetection(), 
config.getMessageLoadBalancingType(), config.getConfirmationWindowSize(), 
executorFactory, server, postOffice, managementService, scheduledExecutor, 
config.getMaxHops(), nodeManager, server.getConfiguration().getClusterUser(), 
server.getConfiguration().getClusterPassword(), 
config.isAllowDirectConnectionsOnly(), config.getClusterNotificationInterval(), 
config.getClusterNotificationAttempts());
+         clusterConnection = new ClusterConnectionImpl(this, tcConfigs, 
connector, new SimpleString(config.getName()), new 
SimpleString(config.getAddress()), config.getMinLargeMessageSize(), 
config.getClientFailureCheckPeriod(), config.getConnectionTTL(), 
config.getRetryInterval(), config.getRetryIntervalMultiplier(), 
config.getMaxRetryInterval(), config.getInitialConnectAttempts(), 
config.getReconnectAttempts(), config.getCallTimeout(), 
config.getCallFailoverTimeout(), config.isDuplicateDetection(), 
config.getMessageLoadBalancingType(), config.getConfirmationWindowSize(), 
config.getProducerWindowSize(), executorFactory, server, postOffice, 
managementService, scheduledExecutor, config.getMaxHops(), nodeManager, 
server.getConfiguration().getClusterUser(), 
server.getConfiguration().getClusterPassword(), 
config.isAllowDirectConnectionsOnly(), config.getClusterNotificationInterval(), 
config.getClusterNotificationAttempts());
 
          clusterController.addClusterConnection(clusterConnection.getName(), 
tcConfigs, config);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce9ea176/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 82b1e4f..d009e79 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -119,6 +119,8 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
 
    private final int confirmationWindowSize;
 
+   private final int producerWindowSize;
+
    /**
     * Guard for the field {@link #records}. Note that the field is {@link 
ConcurrentHashMap},
     * however we need the guard to synchronize multiple step operations during 
topology updates.
@@ -184,6 +186,7 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
                                 final boolean useDuplicateDetection,
                                 final MessageLoadBalancingType 
messageLoadBalancingType,
                                 final int confirmationWindowSize,
+                                final int producerWindowSize,
                                 final ExecutorFactory executorFactory,
                                 final ActiveMQServer server,
                                 final PostOffice postOffice,
@@ -224,6 +227,8 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
 
       this.confirmationWindowSize = confirmationWindowSize;
 
+      this.producerWindowSize = producerWindowSize;
+
       this.executorFactory = executorFactory;
 
       this.clusterNotificationInterval = clusterNotificationInterval;
@@ -290,6 +295,7 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
                                 final boolean useDuplicateDetection,
                                 final MessageLoadBalancingType 
messageLoadBalancingType,
                                 final int confirmationWindowSize,
+                                final int producerWindowSize,
                                 final ExecutorFactory executorFactory,
                                 final ActiveMQServer server,
                                 final PostOffice postOffice,
@@ -336,6 +342,8 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
 
       this.confirmationWindowSize = confirmationWindowSize;
 
+      this.producerWindowSize = producerWindowSize;
+
       this.executorFactory = executorFactory;
 
       this.clusterNotificationInterval = clusterNotificationInterval;
@@ -601,8 +609,7 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
          serverLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
          serverLocator.setCallTimeout(callTimeout);
          serverLocator.setCallFailoverTimeout(callFailoverTimeout);
-         // No producer flow control on the bridges, as we don't want to lock 
the queues
-         serverLocator.setProducerWindowSize(-1);
+         serverLocator.setProducerWindowSize(producerWindowSize);
 
          if (retryInterval > 0) {
             this.serverLocator.setRetryInterval(retryInterval);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce9ea176/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd 
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 93d2a9e..0d369ba 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -1116,6 +1116,14 @@
             </xsd:annotation>
          </xsd:element>
 
+         <xsd:element name="producer-window-size" type="xsd:int" maxOccurs="1" 
minOccurs="0" default="-1">
+            <xsd:annotation>
+               <xsd:documentation>
+                  Producer flow control
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+
          <xsd:element name="user" type="xsd:string" maxOccurs="1" 
minOccurs="0">
             <xsd:annotation>
                <xsd:documentation>
@@ -1341,6 +1349,14 @@
             </xsd:annotation>
          </xsd:element>
 
+         <xsd:element name="producer-window-size" type="xsd:int" maxOccurs="1" 
minOccurs="0" default="-1">
+            <xsd:annotation>
+               <xsd:documentation>
+                  Producer flow control
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+
          <xsd:element name="call-failover-timeout" type="xsd:long" 
default="-1" maxOccurs="1" minOccurs="0">
             <xsd:annotation>
                <xsd:documentation>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce9ea176/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 63962fb..f7db6e7 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -215,6 +215,7 @@ public class FileConfigurationTest extends 
ConfigurationImplTest {
             Assert.assertEquals(true, bc.isUseDuplicateDetection());
             Assert.assertEquals("connector1", bc.getStaticConnectors().get(0));
             Assert.assertEquals(null, bc.getDiscoveryGroupName());
+            Assert.assertEquals(444, bc.getProducerWindowSize());
          }
          else {
             Assert.assertEquals("bridge2", bc.getName());
@@ -224,6 +225,7 @@ public class FileConfigurationTest extends 
ConfigurationImplTest {
             Assert.assertEquals(null, bc.getTransformerClassName());
             Assert.assertEquals(null, bc.getStaticConnectors());
             Assert.assertEquals("dg1", bc.getDiscoveryGroupName());
+            Assert.assertEquals(555, bc.getProducerWindowSize());
          }
       }
 
@@ -256,6 +258,7 @@ public class FileConfigurationTest extends 
ConfigurationImplTest {
             Assert.assertEquals("connector1", 
ccc.getStaticConnectors().get(0));
             Assert.assertEquals("connector2", 
ccc.getStaticConnectors().get(1));
             Assert.assertEquals(null, ccc.getDiscoveryGroupName());
+            Assert.assertEquals(222, ccc.getProducerWindowSize());
          }
          else {
             Assert.assertEquals("cluster-connection2", ccc.getName());
@@ -268,6 +271,7 @@ public class FileConfigurationTest extends 
ConfigurationImplTest {
             Assert.assertEquals(2, ccc.getMaxHops());
             Assert.assertEquals(Collections.emptyList(), 
ccc.getStaticConnectors());
             Assert.assertEquals("dg1", ccc.getDiscoveryGroupName());
+            Assert.assertEquals(333, ccc.getProducerWindowSize());
          }
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce9ea176/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml 
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index ec6d6e9..9304745 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -142,6 +142,7 @@
             <reconnect-attempts>2</reconnect-attempts>
             <failover-on-server-shutdown>false</failover-on-server-shutdown>
             <use-duplicate-detection>true</use-duplicate-detection>
+            <producer-window-size>444</producer-window-size>
             <static-connectors>
                <connector-ref>connector1</connector-ref>
             </static-connectors>
@@ -149,6 +150,7 @@
          <bridge name="bridge2">
             <queue-name>queue2</queue-name>
             <forwarding-address>bridge-forwarding-address2</forwarding-address>
+            <producer-window-size>555</producer-window-size>
             <discovery-group-ref discovery-group-name="dg1"/>
          </bridge>
       </bridges>
@@ -180,6 +182,7 @@
             <use-duplicate-detection>true</use-duplicate-detection>
             <message-load-balancing>ON_DEMAND</message-load-balancing>
             <max-hops>1</max-hops>
+            <producer-window-size>222</producer-window-size>
             <call-failover-timeout>123</call-failover-timeout>
             <static-connectors>
                <connector-ref>connector1</connector-ref>
@@ -194,6 +197,7 @@
             <use-duplicate-detection>false</use-duplicate-detection>
             <message-load-balancing>STRICT</message-load-balancing>
             <max-hops>2</max-hops>
+            <producer-window-size>333</producer-window-size>
             <call-failover-timeout>456</call-failover-timeout>
             <discovery-group-ref discovery-group-name="dg1"/>
          </cluster-connection>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce9ea176/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
index afd7697..0da4561 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
@@ -16,6 +16,12 @@
  */
 package org.apache.activemq.artemis.tests.integration.cluster.bridge;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -1621,6 +1627,195 @@ public class BridgeTest extends ActiveMQTestBase {
    }
 
    @Test
+   public void testBridgeWithVeryLargeMessage() throws Exception {
+      ActiveMQServer server0 = null;
+      ActiveMQServer server1 = null;
+
+      final int PAGE_MAX = 1024 * 1024;
+
+      final int PAGE_SIZE = 10 * 1024;
+      ServerLocator locator = null;
+
+      try {
+         Map<String, Object> server0Params = new HashMap<>();
+         server0 = createClusteredServerWithParams(isNetty(), 0, true, 
PAGE_SIZE, PAGE_MAX, server0Params);
+
+         Map<String, Object> server1Params = new HashMap<>();
+         addTargetParameters(server1Params);
+         server1 = createClusteredServerWithParams(isNetty(), 1, true, 
server1Params);
+
+         final String testAddress = "testAddress";
+         final String queueName0 = "queue0";
+         final String forwardAddress = "forwardAddress";
+         final String queueName1 = "queue1";
+
+         Map<String, TransportConfiguration> connectors = new HashMap<>();
+         TransportConfiguration server0tc = new 
TransportConfiguration(getConnector(), server0Params);
+
+         TransportConfiguration server1tc = new 
TransportConfiguration(getConnector(), server1Params);
+         connectors.put(server1tc.getName(), server1tc);
+
+         server0.getConfiguration().setConnectorConfigurations(connectors);
+
+         ArrayList<String> staticConnectors = new ArrayList<>();
+         staticConnectors.add(server1tc.getName());
+
+         int minLargeMessageSize = 1024 * 1024;
+
+         BridgeConfiguration bridgeConfiguration = new BridgeConfiguration()
+            .setName("bridge1")
+            .setQueueName(queueName0)
+            .setForwardingAddress(forwardAddress)
+            .setRetryInterval(1000)
+            .setReconnectAttemptsOnSameNode(-1)
+            .setUseDuplicateDetection(false)
+            .setConfirmationWindowSize(1024)
+            .setStaticConnectors(staticConnectors)
+            .setMinLargeMessageSize(minLargeMessageSize)
+            .setProducerWindowSize(minLargeMessageSize / 2);
+
+         List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
+         bridgeConfigs.add(bridgeConfiguration);
+         server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+         CoreQueueConfiguration queueConfig0 = new 
CoreQueueConfiguration().setAddress(testAddress).setName(queueName0);
+         List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<>();
+         queueConfigs0.add(queueConfig0);
+         server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+         CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration()
+            .setAddress(forwardAddress)
+            .setName(queueName1);
+         List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<>();
+         queueConfigs1.add(queueConfig1);
+         server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+         server1.start();
+         server0.start();
+
+         locator = 
addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, 
server1tc));
+
+         ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+
+         ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+
+         ClientSession session0 = sf0.createSession(false, true, true);
+
+         ClientSession session1 = sf1.createSession(false, true, true);
+
+         ClientProducer producer0 = session0.createProducer(new 
SimpleString(testAddress));
+
+         ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+         session1.start();
+
+         //create a large message bigger than Integer.MAX_VALUE
+         final long largeMessageSize = Integer.MAX_VALUE + 1000L;
+
+         ClientMessage largeMessage = createLargeMessage(session0, 
largeMessageSize);
+
+         producer0.send(largeMessage);
+
+         session0.commit();
+
+         //check target queue for large message arriving
+         ClientSession.QueueQuery query = session1.queueQuery(new 
SimpleString(queueName1));
+         long messageCount = query.getMessageCount();
+         int count = 0;
+         //wait for 300 sec max
+         while (messageCount == 0 && count < 300) {
+            count++;
+            Thread.sleep(1000);
+            query = session1.queueQuery(new SimpleString(queueName1));
+            messageCount = query.getMessageCount();
+         }
+
+         if (messageCount == 0) {
+            fail("large message didn't arrived after 5 min!");
+         }
+
+         //receive the message
+         ClientMessage message = consumer1.receive(5000);
+         message.acknowledge();
+
+         File outputFile = new File(getTemporaryDir(), 
"huge_message_received.dat");
+
+         System.out.println("-----message save to: " + 
outputFile.getAbsolutePath());
+         FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
+
+         BufferedOutputStream bufferedOutput = new 
BufferedOutputStream(fileOutputStream);
+
+         message.setOutputStream(bufferedOutput);
+
+         if (!message.waitOutputStreamCompletion(5 * 60 * 1000)) {
+            fail("message didn't get received to disk in 5 min. Is the machine 
slow?");
+         }
+         session1.commit();
+
+         Assert.assertNull(consumer1.receiveImmediate());
+
+         session0.close();
+
+         session1.close();
+
+         sf0.close();
+
+         sf1.close();
+
+      }
+      finally {
+         if (locator != null) {
+            locator.close();
+         }
+         try {
+            server0.stop();
+         }
+         catch (Throwable ignored) {
+         }
+
+         try {
+            server1.stop();
+         }
+         catch (Throwable ignored) {
+         }
+      }
+
+      assertEquals(0, loadQueues(server0).size());
+   }
+
+   private ClientMessage createLargeMessage(ClientSession session, long 
largeMessageSize) throws Exception {
+      File fileInput = new File(getTemporaryDir(), "huge_message_to_send.dat");
+
+      createFile(fileInput, largeMessageSize);
+
+      System.out.println("File created at: " + fileInput.getAbsolutePath());
+
+      ClientMessage message = session.createMessage(ClientMessage.BYTES_TYPE, 
true);
+
+      FileInputStream fileInputStream = new FileInputStream(fileInput);
+      BufferedInputStream bufferedInput = new 
BufferedInputStream(fileInputStream);
+
+      message.setBodyInputStream(bufferedInput);
+
+      return message;
+   }
+
+   private static void createFile(final File file, final long fileSize) throws 
IOException {
+      if (file.exists()) {
+         System.out.println("---file already there " + file.length());
+         return;
+      }
+      FileOutputStream fileOut = new FileOutputStream(file);
+      BufferedOutputStream buffOut = new BufferedOutputStream(fileOut);
+      byte[] outBuffer = new byte[1024 * 1024];
+      System.out.println(" --- creating file, size: " + fileSize);
+      for (long i = 0; i < fileSize; i += outBuffer.length) {
+         buffOut.write(outBuffer);
+      }
+      buffOut.close();
+   }
+
+   @Test
    public void testNullForwardingAddress() throws Exception {
       Map<String, Object> server0Params = new HashMap<>();
       server0 = createClusteredServerWithParams(isNetty(), 0, false, 
server0Params);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce9ea176/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
index aa4d685..0e44bae 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
@@ -667,6 +667,7 @@ public class ActiveMQServerControlTest extends 
ManagementTestBase {
                                  null, // filterString
                                  ActiveMQClient.DEFAULT_RETRY_INTERVAL, 
ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER, 
ActiveMQClient.INITIAL_CONNECT_ATTEMPTS, 
ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS, false, // duplicateDetection
                                  1, // confirmationWindowSize
+                                 -1, // producerWindowSize
                                  
ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, connectorConfig.getName(), 
// liveConnector
                                  false, false, null, null);
 

Reply via email to