gemmellr commented on code in PR #4605:
URL: https://github.com/apache/activemq-artemis/pull/4605#discussion_r1318823508


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java:
##########
@@ -239,7 +271,44 @@ SimpleString 
getMirrorSNF(AMQPMirrorBrokerConnectionElement mirrorElement) {
       return snf;
    }
 
+   /**
+    * Adds a remote link closed event intercepter that can intercept the 
closed event and if it
+    * returns true indicate that the close has been handled and that normal 
broker connection
+    * remote link closed handling should be ignored.
+    *
+    * @param id
+    *    A unique Id value that identifies the intercepter for later removal.
+    * @param intercepter
+    *    The predicate that will be called for any link close.
+    *
+    * @return this broker connection instance.
+    */
+   public AMQPBrokerConnection addLinkClosedIntercepter(String id, 
Predicate<Link> intercepter) {

Review Comment:
   Intercepter -> Interceptor for consistency with other areas?



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupport.java:
##########
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.connect.federation;
+
+import java.util.AbstractMap;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.config.TransformerConfiguration;
+import org.apache.activemq.artemis.core.config.WildcardConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
+import 
org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
+import 
org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy;
+import 
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.WritableBuffer;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.OPERATION_TYPE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.POLICY_NAME;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_EXCLUDES;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_INCLUDES;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_INCLUDE_FEDERATED;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_PRIORITY_ADJUSTMENT;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.TRANSFORMER_CLASS_NAME;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.TRANSFORMER_PROPERTIES_MAP;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.POLICY_PROPERTIES_MAP;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADD_QUEUE_POLICY;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_DELAY;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_MSG_COUNT;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_ENABLE_DIVERT_BINDINGS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_EXCLUDES;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_INCLUDES;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_MAX_HOPS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADD_ADDRESS_POLICY;
+
+/**
+ * Tools used when loading AMQP Broker connections configuration that includes 
Federation
+ * configuration.
+ */
+public final class AMQPFederationPolicySupport {
+
+   /**
+    * Default priority adjustment used for a federation queue match policy if 
nothing
+    * was configured in the broker configuration file.
+    */
+   public static final int DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT = -1;
+
+   /**
+    * Annotation added to received messages from address consumers that 
indicates how many
+    * times the message has traversed a federation link.
+    */
+   public static final Symbol MESSAGE_HOPS_ANNOTATION = 
Symbol.valueOf("x-opt-amq-fed-hops");
+
+   /**
+    * Property name used to embed a nested map of properties meant to be 
applied if the address
+    * indicated in an federation address receiver auto creates the federated 
address.
+    */
+   public static final Symbol FEDERATED_ADDRESS_SOURCE_PROPERTIES = 
Symbol.valueOf("federated-address-source-properties");
+
+   /**
+    * Create an AMQP Message used to instruct the remote peer that it should 
perform
+    * Federation operations on the given {@link 
FederationReceiveFromQueuePolicy}.
+    *
+    * @param policy
+    *    The policy to encode into an AMQP message.
+    *
+    * @return an AMQP Message with the encoded policy.
+    */
+   public static AMQPMessage 
encodeQueuePolicyControlMessage(FederationReceiveFromQueuePolicy policy) {
+      final Map<Symbol, Object> annotations = new LinkedHashMap<>();
+      final MessageAnnotations messageAnnotations = new 
MessageAnnotations(annotations);
+      final Map<String, Object> policyMap = new LinkedHashMap<>();
+      final Section sectionBody = new AmqpValue(policyMap);
+      final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+
+      annotations.put(OPERATION_TYPE, ADD_QUEUE_POLICY);
+
+      policyMap.put(POLICY_NAME, policy.getPolicyName());
+      policyMap.put(QUEUE_INCLUDE_FEDERATED, policy.isIncludeFederated());
+      policyMap.put(QUEUE_PRIORITY_ADJUSTMENT, policy.getPriorityAjustment());
+
+      if (!policy.getIncludes().isEmpty()) {
+         final List<String> flattenedIncludes = new 
ArrayList<>(policy.getIncludes().size() * 2);
+         policy.getIncludes().forEach((entry) -> {
+            flattenedIncludes.add(entry.getKey());
+            flattenedIncludes.add(entry.getValue());
+         });
+
+         policyMap.put(QUEUE_INCLUDES, flattenedIncludes);
+      }
+
+      if (!policy.getExcludes().isEmpty()) {
+         final List<String> flatteneExcludes = new 
ArrayList<>(policy.getExcludes().size() * 2);
+         policy.getExcludes().forEach((entry) -> {
+            flatteneExcludes.add(entry.getKey());
+            flatteneExcludes.add(entry.getValue());
+         });
+
+         policyMap.put(QUEUE_EXCLUDES, flatteneExcludes);
+      }
+
+      if (!policy.getProperties().isEmpty()) {
+         policyMap.put(POLICY_PROPERTIES_MAP, policy.getProperties());
+      }
+
+      if (policy.getTransformerConfiguration() != null) {
+         final TransformerConfiguration config = 
policy.getTransformerConfiguration();
+
+         policyMap.put(TRANSFORMER_CLASS_NAME, config.getClassName());
+         if (!config.getProperties().isEmpty()) {
+            policyMap.put(TRANSFORMER_PROPERTIES_MAP, config.getProperties());
+         }
+      }
+
+      try {
+         final EncoderImpl encoder = TLSEncode.getEncoder();
+         encoder.setByteBuffer(new NettyWritable(buffer));
+         encoder.writeObject(messageAnnotations);
+         encoder.writeObject(sectionBody);
+
+         final byte[] data = new byte[buffer.writerIndex()];
+         buffer.readBytes(data);
+
+         return new AMQPStandardMessage(0, data, null);
+      } finally {
+         TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+         buffer.release();
+      }
+   }
+
+   /**
+    * Create an AMQP Message used to instruct the remote peer that it should 
perform
+    * Federation operations on the given {@link 
FederationReceiveFromAddressPolicy}.
+    *
+    * @param policy
+    *    The policy to encode into an AMQP message.
+    *
+    * @return an AMQP Message with the encoded policy.
+    */
+   public static AMQPMessage 
encodeAddressPolicyControlMessage(FederationReceiveFromAddressPolicy policy) {
+      final Map<Symbol, Object> annotations = new LinkedHashMap<>();
+      final MessageAnnotations messageAnnotations = new 
MessageAnnotations(annotations);
+      final Map<String, Object> policyMap = new LinkedHashMap<>();
+      final Section sectionBody = new AmqpValue(policyMap);
+      final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+
+      annotations.put(OPERATION_TYPE, ADD_ADDRESS_POLICY);
+
+      policyMap.put(POLICY_NAME, policy.getPolicyName());
+      policyMap.put(ADDRESS_AUTO_DELETE, policy.isAutoDelete());
+      policyMap.put(ADDRESS_AUTO_DELETE_DELAY, policy.getAutoDeleteDelay());
+      policyMap.put(ADDRESS_AUTO_DELETE_MSG_COUNT, 
policy.getAutoDeleteMessageCount());
+      policyMap.put(ADDRESS_MAX_HOPS, policy.getMaxHops());
+      policyMap.put(ADDRESS_ENABLE_DIVERT_BINDINGS, 
policy.isEnableDivertBindings());
+      if (!policy.getIncludes().isEmpty()) {
+         policyMap.put(ADDRESS_INCLUDES, new 
ArrayList<>(policy.getIncludes()));
+      }
+      if (!policy.getExcludes().isEmpty()) {
+         policyMap.put(ADDRESS_EXCLUDES, new 
ArrayList<>(policy.getExcludes()));
+      }
+
+      if (!policy.getProperties().isEmpty()) {
+         policyMap.put(POLICY_PROPERTIES_MAP, policy.getProperties());
+      }
+
+      if (policy.getTransformerConfiguration() != null) {
+         final TransformerConfiguration config = 
policy.getTransformerConfiguration();
+
+         policyMap.put(TRANSFORMER_CLASS_NAME, config.getClassName());
+         if (!config.getProperties().isEmpty()) {
+            policyMap.put(TRANSFORMER_PROPERTIES_MAP, config.getProperties());
+         }
+      }
+
+      try {
+         final EncoderImpl encoder = TLSEncode.getEncoder();
+         encoder.setByteBuffer(new NettyWritable(buffer));
+         encoder.writeObject(messageAnnotations);
+         encoder.writeObject(sectionBody);
+
+         final byte[] data = new byte[buffer.writerIndex()];
+         buffer.readBytes(data);
+
+         return new AMQPStandardMessage(0, data, null);
+      } finally {
+         TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+         buffer.release();
+      }
+   }
+
+   /**
+    * Given an AMQP Message decode an {@link FederationReceiveFromQueuePolicy} 
from it and return
+    * the decoded value. The message should have already been inspected and 
determined to be an
+    * control message of the add to policy type.
+    *
+    * @param message
+    *    The {@link AMQPMessage} that should carry an encoded {@link 
FederationReceiveFromQueuePolicy}
+    * @param wildcardConfig
+    *    The {@link WildcardConfiguration} to use in the decoded policy.
+    *
+    * @return a decoded {@link FederationReceiveFromQueuePolicy} instance.
+    *
+    * @throws ActiveMQException if an error occurs while decoding the policy.
+    */
+   @SuppressWarnings("unchecked")
+   public static FederationReceiveFromQueuePolicy 
decodeReceiveFromQueuePolicy(AMQPMessage message, WildcardConfiguration 
wildcardConfig) throws ActiveMQException {
+      final Section body = message.getBody();
+
+      if (!(body instanceof AmqpValue)) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Message body was not an AmqpValue type");
+      }
+
+      final AmqpValue bodyValue = (AmqpValue) body;
+
+      if (bodyValue.getValue() == null || !(bodyValue.getValue() instanceof 
Map)) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Message body AmqpValue did not carry an encoded Map");
+      }
+
+      try {
+         final Map<String, Object> policyMap = (Map<String, Object>) 
bodyValue.getValue();
+
+         if (!policyMap.containsKey(POLICY_NAME)) {
+            throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+               "Message body did not carry the required policy name");
+         }
+
+         final String policyName = (String) policyMap.get(POLICY_NAME);
+         final boolean includeFederated = (boolean) 
policyMap.getOrDefault(QUEUE_INCLUDE_FEDERATED, false);
+         final int priorityAdjustment = ((Number) 
policyMap.getOrDefault(QUEUE_PRIORITY_ADJUSTMENT, 
DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT)).intValue();
+         final Set<Map.Entry<String, String>> includes = 
decodeFlattenedFilterSet(policyMap, QUEUE_INCLUDES);
+         final Set<Map.Entry<String, String>> excludes = 
decodeFlattenedFilterSet(policyMap, QUEUE_EXCLUDES);
+         final TransformerConfiguration transformerConfig;
+
+         if (policyMap.containsKey(TRANSFORMER_CLASS_NAME)) {
+            transformerConfig = new TransformerConfiguration();
+            transformerConfig.setClassName((String) 
policyMap.get(TRANSFORMER_CLASS_NAME));
+            transformerConfig.setProperties((Map<String, String>) 
policyMap.get(TRANSFORMER_PROPERTIES_MAP));
+         } else {
+            transformerConfig = null;
+         }
+
+         final Map<String, Object> properties;
+
+         if (policyMap.containsKey(POLICY_PROPERTIES_MAP)) {
+            properties = (Map<String, Object>) 
policyMap.get(POLICY_PROPERTIES_MAP);
+         } else {
+            properties = null;
+         }
+
+         return new FederationReceiveFromQueuePolicy(policyName, 
includeFederated, priorityAdjustment,
+                                                     includes, excludes, 
properties, transformerConfig,
+                                                     wildcardConfig);
+      } catch (ActiveMQException amqEx) {
+         throw amqEx;
+      } catch (Exception e) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Invalid encoded queue policy entry: " + e.getMessage());
+      }
+   }
+
+   @SuppressWarnings("unchecked")
+   private static Set<Map.Entry<String, String>> 
decodeFlattenedFilterSet(Map<String, Object> policyMap, String target) throws 
ActiveMQException {
+      final Object encodedObject = policyMap.get(target);
+
+      if (encodedObject == null) {
+         return Collections.EMPTY_SET;
+      }
+
+      if (!(encodedObject instanceof List)) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Encoded queue policy entry was not the expected List type : " + 
target);
+      }
+
+      final Set<Map.Entry<String, String>> policyEntrySet;
+
+      try {
+         final List<String> flattenedEntrySet = (List<String>) encodedObject;
+
+         if (flattenedEntrySet.isEmpty()) {
+            return Collections.EMPTY_SET;
+         }
+
+         if ((flattenedEntrySet.size() & 1) != 0) {
+            throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+               "Encoded queue policy entry was must contain an even number of 
elements : " + target);
+         }
+
+         policyEntrySet = new HashSet<>(Math.max(2, flattenedEntrySet.size() / 
2));
+
+         for (int i = 0; i < flattenedEntrySet.size(); ) {
+            policyEntrySet.add(new SimpleEntry<>(flattenedEntrySet.get(i++), 
flattenedEntrySet.get(i++)));
+         }
+
+      } catch (ActiveMQException amqEx) {
+         throw amqEx;
+      } catch (Exception e) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Invalid encoded queue policy entry: " + e.getMessage());
+      }
+
+      return policyEntrySet;
+   }
+
+   /**
+    * Given an AMQP Message decode an {@link 
FederationReceiveFromAddressPolicy} from it and return
+    * the decoded value. The message should have already been inspected and 
determined to be an
+    * control message of the add to policy type.
+    *
+    * @param message
+    *    The {@link AMQPMessage} that should carry an encoded {@link 
FederationReceiveFromQueuePolicy}
+    * @param wildcardConfig
+    *    The {@link WildcardConfiguration} to use in the decoded policy.
+    *
+    * @return a decoded {@link FederationReceiveFromAddressPolicy} instance.
+    *
+    * @throws ActiveMQException if an error occurs during the policy decode.
+    */
+   @SuppressWarnings("unchecked")
+   public static FederationReceiveFromAddressPolicy 
decodeReceiveFromAddressPolicy(AMQPMessage message, WildcardConfiguration 
wildcardConfig) throws ActiveMQException {
+      final Section body = message.getBody();
+
+      if (!(body instanceof AmqpValue)) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Message body was not an AmqpValue type");
+      }
+
+      final AmqpValue bodyValue = (AmqpValue) body;
+
+      if (bodyValue.getValue() == null || !(bodyValue.getValue() instanceof 
Map)) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Message body AmqpValue did not carry an encoded Map");
+      }
+
+      try {
+         final Map<String, Object> policyMap = (Map<String, Object>) 
bodyValue.getValue();
+
+         if (!policyMap.containsKey(POLICY_NAME)) {
+            throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+               "Message body did not carry the required policy name");
+         }
+
+         if (!policyMap.containsKey(ADDRESS_MAX_HOPS)) {
+            throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+               "Message body did not carry the required max hops 
configuration");
+         }
+
+         final String policyName = (String) policyMap.get(POLICY_NAME);
+         final boolean autoDelete = (Boolean) 
policyMap.getOrDefault(ADDRESS_AUTO_DELETE, false);
+         final long autoDeleteDelay = ((Number) 
policyMap.getOrDefault(ADDRESS_AUTO_DELETE_DELAY, 0L)).longValue();
+         final long autoDeleteMsgCount = ((Number) 
policyMap.getOrDefault(ADDRESS_AUTO_DELETE_MSG_COUNT, 0L)).longValue();
+         final int maxHops = ((Number) 
policyMap.get(ADDRESS_MAX_HOPS)).intValue();
+         final boolean enableDiverts = (Boolean) 
policyMap.getOrDefault(ADDRESS_ENABLE_DIVERT_BINDINGS, false);
+
+         final Set<String> includes;
+         final Set<String> excludes;
+
+         if (policyMap.containsKey(ADDRESS_INCLUDES)) {
+            includes = (Set<String>) new 
HashSet<>((List<String>)policyMap.get(ADDRESS_INCLUDES));
+         } else {
+            includes = Collections.EMPTY_SET;
+         }
+
+         if (policyMap.containsKey(ADDRESS_EXCLUDES)) {
+            excludes = (Set<String>) new 
HashSet<>((List<String>)policyMap.get(ADDRESS_EXCLUDES));
+         } else {
+            excludes = Collections.EMPTY_SET;
+         }
+
+         final TransformerConfiguration transformerConfig;
+
+         if (policyMap.containsKey(TRANSFORMER_CLASS_NAME)) {
+            transformerConfig = new TransformerConfiguration();
+            transformerConfig.setClassName((String) 
policyMap.get(TRANSFORMER_CLASS_NAME));
+            transformerConfig.setProperties((Map<String, String>) 
policyMap.get(TRANSFORMER_PROPERTIES_MAP));
+         } else {
+            transformerConfig = null;
+         }
+
+         final Map<String, Object> properties;
+
+         if (policyMap.containsKey(POLICY_PROPERTIES_MAP)) {
+            properties = (Map<String, Object>) 
policyMap.get(POLICY_PROPERTIES_MAP);
+         } else {
+            properties = null;
+         }
+
+         return new FederationReceiveFromAddressPolicy(policyName, autoDelete, 
autoDeleteDelay,
+                                                       autoDeleteMsgCount, 
maxHops, enableDiverts,
+                                                       includes, excludes, 
properties, transformerConfig,
+                                                       wildcardConfig);
+      } catch (Exception e) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Invalid encoded address policy entry: " + e.getMessage());
+      }
+   }
+
+   /**
+    * From the broker AMQP broker connection configuration element and the 
configured wild-card
+    * settings create an address match policy.
+    *
+    * @param element
+    *    The broker connections element configuration that creates this policy.
+    * @param wildcards
+    *    The configured wild-card settings for the broker or defaults.
+    *
+    * @return a new address match and handling policy for use in the broker 
connection.
+    */
+   @SuppressWarnings("unchecked")
+   public static FederationReceiveFromAddressPolicy 
create(AMQPFederationAddressPolicyElement element, WildcardConfiguration 
wildcards) {
+      final Set<String> includes;
+      final Set<String> excludes;
+
+      if (element.getIncludes() != null && !element.getIncludes().isEmpty()) {
+         includes = new HashSet<>(element.getIncludes().size());
+
+         element.getIncludes().forEach(addressMatch -> 
includes.add(addressMatch.getAddressMatch()));
+      } else {
+         includes = Collections.EMPTY_SET;
+      }
+
+      if (element.getExcludes() != null && !element.getIncludes().isEmpty()) {

Review Comment:
   c&p error? null-checks excludes but then empty-checks includes.



##########
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/config/FederationConfiguration.java:
##########
@@ -74,7 +74,7 @@ public Map<String, FederationPolicy> getQueuePolicies() {
    }
 
    // strange spelling!, it allows a type match for singular of correct plural 
Policies from properties
-   public FederationConfiguration 
addQueuePolicie(FederationQueuePolicyConfiguration federationPolicy) {
+   public FederationConfiguration 
addQueuePolicy(FederationQueuePolicyConfiguration federationPolicy) {

Review Comment:
   Comment above method is inaccurate after the name changes.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupport.java:
##########
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.connect.federation;
+
+import java.util.AbstractMap;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.config.TransformerConfiguration;
+import org.apache.activemq.artemis.core.config.WildcardConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
+import 
org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
+import 
org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy;
+import 
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.WritableBuffer;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.OPERATION_TYPE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.POLICY_NAME;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_EXCLUDES;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_INCLUDES;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_INCLUDE_FEDERATED;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_PRIORITY_ADJUSTMENT;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.TRANSFORMER_CLASS_NAME;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.TRANSFORMER_PROPERTIES_MAP;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.POLICY_PROPERTIES_MAP;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADD_QUEUE_POLICY;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_DELAY;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_MSG_COUNT;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_ENABLE_DIVERT_BINDINGS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_EXCLUDES;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_INCLUDES;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_MAX_HOPS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADD_ADDRESS_POLICY;
+
+/**
+ * Tools used when loading AMQP Broker connections configuration that includes 
Federation
+ * configuration.
+ */
+public final class AMQPFederationPolicySupport {
+
+   /**
+    * Default priority adjustment used for a federation queue match policy if 
nothing
+    * was configured in the broker configuration file.
+    */
+   public static final int DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT = -1;
+
+   /**
+    * Annotation added to received messages from address consumers that 
indicates how many
+    * times the message has traversed a federation link.
+    */
+   public static final Symbol MESSAGE_HOPS_ANNOTATION = 
Symbol.valueOf("x-opt-amq-fed-hops");
+
+   /**
+    * Property name used to embed a nested map of properties meant to be 
applied if the address
+    * indicated in an federation address receiver auto creates the federated 
address.
+    */
+   public static final Symbol FEDERATED_ADDRESS_SOURCE_PROPERTIES = 
Symbol.valueOf("federated-address-source-properties");
+
+   /**
+    * Create an AMQP Message used to instruct the remote peer that it should 
perform
+    * Federation operations on the given {@link 
FederationReceiveFromQueuePolicy}.
+    *
+    * @param policy
+    *    The policy to encode into an AMQP message.
+    *
+    * @return an AMQP Message with the encoded policy.
+    */
+   public static AMQPMessage 
encodeQueuePolicyControlMessage(FederationReceiveFromQueuePolicy policy) {
+      final Map<Symbol, Object> annotations = new LinkedHashMap<>();
+      final MessageAnnotations messageAnnotations = new 
MessageAnnotations(annotations);
+      final Map<String, Object> policyMap = new LinkedHashMap<>();
+      final Section sectionBody = new AmqpValue(policyMap);
+      final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+
+      annotations.put(OPERATION_TYPE, ADD_QUEUE_POLICY);
+
+      policyMap.put(POLICY_NAME, policy.getPolicyName());
+      policyMap.put(QUEUE_INCLUDE_FEDERATED, policy.isIncludeFederated());
+      policyMap.put(QUEUE_PRIORITY_ADJUSTMENT, policy.getPriorityAjustment());
+
+      if (!policy.getIncludes().isEmpty()) {
+         final List<String> flattenedIncludes = new 
ArrayList<>(policy.getIncludes().size() * 2);
+         policy.getIncludes().forEach((entry) -> {
+            flattenedIncludes.add(entry.getKey());
+            flattenedIncludes.add(entry.getValue());
+         });
+
+         policyMap.put(QUEUE_INCLUDES, flattenedIncludes);
+      }
+
+      if (!policy.getExcludes().isEmpty()) {
+         final List<String> flatteneExcludes = new 
ArrayList<>(policy.getExcludes().size() * 2);
+         policy.getExcludes().forEach((entry) -> {
+            flatteneExcludes.add(entry.getKey());
+            flatteneExcludes.add(entry.getValue());
+         });
+
+         policyMap.put(QUEUE_EXCLUDES, flatteneExcludes);
+      }
+
+      if (!policy.getProperties().isEmpty()) {
+         policyMap.put(POLICY_PROPERTIES_MAP, policy.getProperties());
+      }
+
+      if (policy.getTransformerConfiguration() != null) {
+         final TransformerConfiguration config = 
policy.getTransformerConfiguration();
+
+         policyMap.put(TRANSFORMER_CLASS_NAME, config.getClassName());
+         if (!config.getProperties().isEmpty()) {
+            policyMap.put(TRANSFORMER_PROPERTIES_MAP, config.getProperties());
+         }
+      }
+
+      try {
+         final EncoderImpl encoder = TLSEncode.getEncoder();
+         encoder.setByteBuffer(new NettyWritable(buffer));
+         encoder.writeObject(messageAnnotations);
+         encoder.writeObject(sectionBody);
+
+         final byte[] data = new byte[buffer.writerIndex()];
+         buffer.readBytes(data);
+
+         return new AMQPStandardMessage(0, data, null);
+      } finally {
+         TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+         buffer.release();
+      }
+   }
+
+   /**
+    * Create an AMQP Message used to instruct the remote peer that it should 
perform
+    * Federation operations on the given {@link 
FederationReceiveFromAddressPolicy}.
+    *
+    * @param policy
+    *    The policy to encode into an AMQP message.
+    *
+    * @return an AMQP Message with the encoded policy.
+    */
+   public static AMQPMessage 
encodeAddressPolicyControlMessage(FederationReceiveFromAddressPolicy policy) {
+      final Map<Symbol, Object> annotations = new LinkedHashMap<>();
+      final MessageAnnotations messageAnnotations = new 
MessageAnnotations(annotations);
+      final Map<String, Object> policyMap = new LinkedHashMap<>();
+      final Section sectionBody = new AmqpValue(policyMap);
+      final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+
+      annotations.put(OPERATION_TYPE, ADD_ADDRESS_POLICY);
+
+      policyMap.put(POLICY_NAME, policy.getPolicyName());
+      policyMap.put(ADDRESS_AUTO_DELETE, policy.isAutoDelete());
+      policyMap.put(ADDRESS_AUTO_DELETE_DELAY, policy.getAutoDeleteDelay());
+      policyMap.put(ADDRESS_AUTO_DELETE_MSG_COUNT, 
policy.getAutoDeleteMessageCount());
+      policyMap.put(ADDRESS_MAX_HOPS, policy.getMaxHops());
+      policyMap.put(ADDRESS_ENABLE_DIVERT_BINDINGS, 
policy.isEnableDivertBindings());
+      if (!policy.getIncludes().isEmpty()) {
+         policyMap.put(ADDRESS_INCLUDES, new 
ArrayList<>(policy.getIncludes()));
+      }
+      if (!policy.getExcludes().isEmpty()) {
+         policyMap.put(ADDRESS_EXCLUDES, new 
ArrayList<>(policy.getExcludes()));
+      }
+
+      if (!policy.getProperties().isEmpty()) {
+         policyMap.put(POLICY_PROPERTIES_MAP, policy.getProperties());
+      }
+
+      if (policy.getTransformerConfiguration() != null) {
+         final TransformerConfiguration config = 
policy.getTransformerConfiguration();
+
+         policyMap.put(TRANSFORMER_CLASS_NAME, config.getClassName());
+         if (!config.getProperties().isEmpty()) {
+            policyMap.put(TRANSFORMER_PROPERTIES_MAP, config.getProperties());
+         }
+      }
+
+      try {
+         final EncoderImpl encoder = TLSEncode.getEncoder();
+         encoder.setByteBuffer(new NettyWritable(buffer));
+         encoder.writeObject(messageAnnotations);
+         encoder.writeObject(sectionBody);
+
+         final byte[] data = new byte[buffer.writerIndex()];
+         buffer.readBytes(data);
+
+         return new AMQPStandardMessage(0, data, null);
+      } finally {
+         TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+         buffer.release();
+      }
+   }
+
+   /**
+    * Given an AMQP Message decode an {@link FederationReceiveFromQueuePolicy} 
from it and return
+    * the decoded value. The message should have already been inspected and 
determined to be an
+    * control message of the add to policy type.
+    *
+    * @param message
+    *    The {@link AMQPMessage} that should carry an encoded {@link 
FederationReceiveFromQueuePolicy}
+    * @param wildcardConfig
+    *    The {@link WildcardConfiguration} to use in the decoded policy.
+    *
+    * @return a decoded {@link FederationReceiveFromQueuePolicy} instance.
+    *
+    * @throws ActiveMQException if an error occurs while decoding the policy.
+    */
+   @SuppressWarnings("unchecked")
+   public static FederationReceiveFromQueuePolicy 
decodeReceiveFromQueuePolicy(AMQPMessage message, WildcardConfiguration 
wildcardConfig) throws ActiveMQException {
+      final Section body = message.getBody();
+
+      if (!(body instanceof AmqpValue)) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Message body was not an AmqpValue type");
+      }
+
+      final AmqpValue bodyValue = (AmqpValue) body;
+
+      if (bodyValue.getValue() == null || !(bodyValue.getValue() instanceof 
Map)) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Message body AmqpValue did not carry an encoded Map");
+      }
+
+      try {
+         final Map<String, Object> policyMap = (Map<String, Object>) 
bodyValue.getValue();
+
+         if (!policyMap.containsKey(POLICY_NAME)) {
+            throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+               "Message body did not carry the required policy name");
+         }
+
+         final String policyName = (String) policyMap.get(POLICY_NAME);
+         final boolean includeFederated = (boolean) 
policyMap.getOrDefault(QUEUE_INCLUDE_FEDERATED, false);
+         final int priorityAdjustment = ((Number) 
policyMap.getOrDefault(QUEUE_PRIORITY_ADJUSTMENT, 
DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT)).intValue();
+         final Set<Map.Entry<String, String>> includes = 
decodeFlattenedFilterSet(policyMap, QUEUE_INCLUDES);
+         final Set<Map.Entry<String, String>> excludes = 
decodeFlattenedFilterSet(policyMap, QUEUE_EXCLUDES);
+         final TransformerConfiguration transformerConfig;
+
+         if (policyMap.containsKey(TRANSFORMER_CLASS_NAME)) {
+            transformerConfig = new TransformerConfiguration();
+            transformerConfig.setClassName((String) 
policyMap.get(TRANSFORMER_CLASS_NAME));
+            transformerConfig.setProperties((Map<String, String>) 
policyMap.get(TRANSFORMER_PROPERTIES_MAP));
+         } else {
+            transformerConfig = null;
+         }
+
+         final Map<String, Object> properties;
+
+         if (policyMap.containsKey(POLICY_PROPERTIES_MAP)) {
+            properties = (Map<String, Object>) 
policyMap.get(POLICY_PROPERTIES_MAP);
+         } else {
+            properties = null;
+         }
+
+         return new FederationReceiveFromQueuePolicy(policyName, 
includeFederated, priorityAdjustment,
+                                                     includes, excludes, 
properties, transformerConfig,
+                                                     wildcardConfig);
+      } catch (ActiveMQException amqEx) {
+         throw amqEx;
+      } catch (Exception e) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Invalid encoded queue policy entry: " + e.getMessage());
+      }
+   }
+
+   @SuppressWarnings("unchecked")
+   private static Set<Map.Entry<String, String>> 
decodeFlattenedFilterSet(Map<String, Object> policyMap, String target) throws 
ActiveMQException {
+      final Object encodedObject = policyMap.get(target);
+
+      if (encodedObject == null) {
+         return Collections.EMPTY_SET;
+      }
+
+      if (!(encodedObject instanceof List)) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Encoded queue policy entry was not the expected List type : " + 
target);
+      }
+
+      final Set<Map.Entry<String, String>> policyEntrySet;
+
+      try {
+         final List<String> flattenedEntrySet = (List<String>) encodedObject;
+
+         if (flattenedEntrySet.isEmpty()) {
+            return Collections.EMPTY_SET;
+         }
+
+         if ((flattenedEntrySet.size() & 1) != 0) {
+            throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+               "Encoded queue policy entry was must contain an even number of 
elements : " + target);
+         }
+
+         policyEntrySet = new HashSet<>(Math.max(2, flattenedEntrySet.size() / 
2));
+
+         for (int i = 0; i < flattenedEntrySet.size(); ) {
+            policyEntrySet.add(new SimpleEntry<>(flattenedEntrySet.get(i++), 
flattenedEntrySet.get(i++)));
+         }
+
+      } catch (ActiveMQException amqEx) {
+         throw amqEx;
+      } catch (Exception e) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Invalid encoded queue policy entry: " + e.getMessage());
+      }
+
+      return policyEntrySet;
+   }
+
+   /**
+    * Given an AMQP Message decode an {@link 
FederationReceiveFromAddressPolicy} from it and return
+    * the decoded value. The message should have already been inspected and 
determined to be an
+    * control message of the add to policy type.
+    *
+    * @param message
+    *    The {@link AMQPMessage} that should carry an encoded {@link 
FederationReceiveFromQueuePolicy}
+    * @param wildcardConfig
+    *    The {@link WildcardConfiguration} to use in the decoded policy.
+    *
+    * @return a decoded {@link FederationReceiveFromAddressPolicy} instance.
+    *
+    * @throws ActiveMQException if an error occurs during the policy decode.
+    */
+   @SuppressWarnings("unchecked")
+   public static FederationReceiveFromAddressPolicy 
decodeReceiveFromAddressPolicy(AMQPMessage message, WildcardConfiguration 
wildcardConfig) throws ActiveMQException {
+      final Section body = message.getBody();
+
+      if (!(body instanceof AmqpValue)) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Message body was not an AmqpValue type");
+      }
+
+      final AmqpValue bodyValue = (AmqpValue) body;
+
+      if (bodyValue.getValue() == null || !(bodyValue.getValue() instanceof 
Map)) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Message body AmqpValue did not carry an encoded Map");
+      }
+
+      try {
+         final Map<String, Object> policyMap = (Map<String, Object>) 
bodyValue.getValue();
+
+         if (!policyMap.containsKey(POLICY_NAME)) {
+            throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+               "Message body did not carry the required policy name");
+         }
+
+         if (!policyMap.containsKey(ADDRESS_MAX_HOPS)) {
+            throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+               "Message body did not carry the required max hops 
configuration");
+         }
+
+         final String policyName = (String) policyMap.get(POLICY_NAME);
+         final boolean autoDelete = (Boolean) 
policyMap.getOrDefault(ADDRESS_AUTO_DELETE, false);
+         final long autoDeleteDelay = ((Number) 
policyMap.getOrDefault(ADDRESS_AUTO_DELETE_DELAY, 0L)).longValue();
+         final long autoDeleteMsgCount = ((Number) 
policyMap.getOrDefault(ADDRESS_AUTO_DELETE_MSG_COUNT, 0L)).longValue();
+         final int maxHops = ((Number) 
policyMap.get(ADDRESS_MAX_HOPS)).intValue();
+         final boolean enableDiverts = (Boolean) 
policyMap.getOrDefault(ADDRESS_ENABLE_DIVERT_BINDINGS, false);
+
+         final Set<String> includes;
+         final Set<String> excludes;
+
+         if (policyMap.containsKey(ADDRESS_INCLUDES)) {
+            includes = (Set<String>) new 
HashSet<>((List<String>)policyMap.get(ADDRESS_INCLUDES));
+         } else {
+            includes = Collections.EMPTY_SET;
+         }
+
+         if (policyMap.containsKey(ADDRESS_EXCLUDES)) {
+            excludes = (Set<String>) new 
HashSet<>((List<String>)policyMap.get(ADDRESS_EXCLUDES));
+         } else {
+            excludes = Collections.EMPTY_SET;
+         }
+
+         final TransformerConfiguration transformerConfig;
+
+         if (policyMap.containsKey(TRANSFORMER_CLASS_NAME)) {
+            transformerConfig = new TransformerConfiguration();
+            transformerConfig.setClassName((String) 
policyMap.get(TRANSFORMER_CLASS_NAME));
+            transformerConfig.setProperties((Map<String, String>) 
policyMap.get(TRANSFORMER_PROPERTIES_MAP));
+         } else {
+            transformerConfig = null;
+         }
+
+         final Map<String, Object> properties;
+
+         if (policyMap.containsKey(POLICY_PROPERTIES_MAP)) {
+            properties = (Map<String, Object>) 
policyMap.get(POLICY_PROPERTIES_MAP);
+         } else {
+            properties = null;
+         }
+
+         return new FederationReceiveFromAddressPolicy(policyName, autoDelete, 
autoDeleteDelay,
+                                                       autoDeleteMsgCount, 
maxHops, enableDiverts,
+                                                       includes, excludes, 
properties, transformerConfig,
+                                                       wildcardConfig);
+      } catch (Exception e) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Invalid encoded address policy entry: " + e.getMessage());
+      }
+   }
+
+   /**
+    * From the broker AMQP broker connection configuration element and the 
configured wild-card
+    * settings create an address match policy.
+    *
+    * @param element
+    *    The broker connections element configuration that creates this policy.
+    * @param wildcards
+    *    The configured wild-card settings for the broker or defaults.
+    *
+    * @return a new address match and handling policy for use in the broker 
connection.
+    */
+   @SuppressWarnings("unchecked")
+   public static FederationReceiveFromAddressPolicy 
create(AMQPFederationAddressPolicyElement element, WildcardConfiguration 
wildcards) {
+      final Set<String> includes;
+      final Set<String> excludes;
+
+      if (element.getIncludes() != null && !element.getIncludes().isEmpty()) {
+         includes = new HashSet<>(element.getIncludes().size());
+
+         element.getIncludes().forEach(addressMatch -> 
includes.add(addressMatch.getAddressMatch()));
+      } else {
+         includes = Collections.EMPTY_SET;
+      }
+
+      if (element.getExcludes() != null && !element.getIncludes().isEmpty()) {
+         excludes = new HashSet<>(element.getExcludes().size());
+
+         element.getExcludes().forEach(addressMatch -> 
excludes.add(addressMatch.getAddressMatch()));
+      } else {
+         excludes = Collections.EMPTY_SET;
+      }
+
+      // We translate from broker configuration to actual implementation to 
avoid any coupling here
+      // as broker configuration could change and or be updated.
+
+      final FederationReceiveFromAddressPolicy policy = new 
FederationReceiveFromAddressPolicy(
+         element.getName(),
+         element.getAutoDelete() == null ? false : element.getAutoDelete(),
+         element.getAutoDeleteDelay() == null ? 0 : 
element.getAutoDeleteDelay(),
+         element.getAutoDeleteMessageCount() == null ? 0 : 
element.getAutoDeleteMessageCount(),
+         element.getMaxHops(),
+         element.isEnableDivertBindings() == null ? false : 
element.isEnableDivertBindings(),
+         includes,
+         excludes,
+         element.getProperties(),
+         element.getTransformerConfiguration(),
+         wildcards);
+
+      return policy;
+   }
+
+   /**
+    * From the broker AMQP broker connection configuration element and the 
configured wild-card
+    * settings create an queue match policy. If not configured otherwise the 
policy is always
+    * defaulted to a value of <code>-1</code> in order to attempt to prevent 
federation consumers
+    * from consuming messages on the remote when a local consumer is present.
+    *
+    * @param element
+    *    The broker connections element configuration that creates this policy.
+    * @param wildcards
+    *    The configured wild-card settings for the broker or defaults.
+    *
+    * @return a new queue match and handling policy for use in the broker 
connection.
+    */
+   @SuppressWarnings("unchecked")
+   public static FederationReceiveFromQueuePolicy 
create(AMQPFederationQueuePolicyElement element, WildcardConfiguration 
wildcards) {
+      final Set<Map.Entry<String, String>> includes;
+      final Set<Map.Entry<String, String>> excludes;
+
+      if (element.getIncludes() != null && !element.getIncludes().isEmpty()) {
+         includes = new HashSet<>(element.getIncludes().size());
+
+         element.getIncludes().forEach(queueMatch ->
+            includes.add(new AbstractMap.SimpleImmutableEntry<String, 
String>(queueMatch.getAddressMatch(), queueMatch.getQueueMatch())));
+      } else {
+         includes = Collections.EMPTY_SET;
+      }
+
+      if (element.getExcludes() != null && !element.getIncludes().isEmpty()) {

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to