tabish121 commented on code in PR #4605:
URL: https://github.com/apache/activemq-artemis/pull/4605#discussion_r1318887367


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java:
##########
@@ -239,7 +271,44 @@ SimpleString 
getMirrorSNF(AMQPMirrorBrokerConnectionElement mirrorElement) {
       return snf;
    }
 
+   /**
+    * Adds a remote link closed event intercepter that can intercept the 
closed event and if it
+    * returns true indicate that the close has been handled and that normal 
broker connection
+    * remote link closed handling should be ignored.
+    *
+    * @param id
+    *    A unique Id value that identifies the intercepter for later removal.
+    * @param intercepter
+    *    The predicate that will be called for any link close.
+    *
+    * @return this broker connection instance.
+    */
+   public AMQPBrokerConnection addLinkClosedIntercepter(String id, 
Predicate<Link> intercepter) {

Review Comment:
   fixed



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupport.java:
##########
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.connect.federation;
+
+import java.util.AbstractMap;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.config.TransformerConfiguration;
+import org.apache.activemq.artemis.core.config.WildcardConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
+import 
org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
+import 
org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy;
+import 
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.WritableBuffer;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.OPERATION_TYPE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.POLICY_NAME;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_EXCLUDES;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_INCLUDES;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_INCLUDE_FEDERATED;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_PRIORITY_ADJUSTMENT;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.TRANSFORMER_CLASS_NAME;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.TRANSFORMER_PROPERTIES_MAP;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.POLICY_PROPERTIES_MAP;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADD_QUEUE_POLICY;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_DELAY;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_MSG_COUNT;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_ENABLE_DIVERT_BINDINGS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_EXCLUDES;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_INCLUDES;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_MAX_HOPS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADD_ADDRESS_POLICY;
+
+/**
+ * Tools used when loading AMQP Broker connections configuration that includes 
Federation
+ * configuration.
+ */
+public final class AMQPFederationPolicySupport {
+
+   /**
+    * Default priority adjustment used for a federation queue match policy if 
nothing
+    * was configured in the broker configuration file.
+    */
+   public static final int DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT = -1;
+
+   /**
+    * Annotation added to received messages from address consumers that 
indicates how many
+    * times the message has traversed a federation link.
+    */
+   public static final Symbol MESSAGE_HOPS_ANNOTATION = 
Symbol.valueOf("x-opt-amq-fed-hops");
+
+   /**
+    * Property name used to embed a nested map of properties meant to be 
applied if the address
+    * indicated in an federation address receiver auto creates the federated 
address.
+    */
+   public static final Symbol FEDERATED_ADDRESS_SOURCE_PROPERTIES = 
Symbol.valueOf("federated-address-source-properties");
+
+   /**
+    * Create an AMQP Message used to instruct the remote peer that it should 
perform
+    * Federation operations on the given {@link 
FederationReceiveFromQueuePolicy}.
+    *
+    * @param policy
+    *    The policy to encode into an AMQP message.
+    *
+    * @return an AMQP Message with the encoded policy.
+    */
+   public static AMQPMessage 
encodeQueuePolicyControlMessage(FederationReceiveFromQueuePolicy policy) {
+      final Map<Symbol, Object> annotations = new LinkedHashMap<>();
+      final MessageAnnotations messageAnnotations = new 
MessageAnnotations(annotations);
+      final Map<String, Object> policyMap = new LinkedHashMap<>();
+      final Section sectionBody = new AmqpValue(policyMap);
+      final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+
+      annotations.put(OPERATION_TYPE, ADD_QUEUE_POLICY);
+
+      policyMap.put(POLICY_NAME, policy.getPolicyName());
+      policyMap.put(QUEUE_INCLUDE_FEDERATED, policy.isIncludeFederated());
+      policyMap.put(QUEUE_PRIORITY_ADJUSTMENT, policy.getPriorityAjustment());
+
+      if (!policy.getIncludes().isEmpty()) {
+         final List<String> flattenedIncludes = new 
ArrayList<>(policy.getIncludes().size() * 2);
+         policy.getIncludes().forEach((entry) -> {
+            flattenedIncludes.add(entry.getKey());
+            flattenedIncludes.add(entry.getValue());
+         });
+
+         policyMap.put(QUEUE_INCLUDES, flattenedIncludes);
+      }
+
+      if (!policy.getExcludes().isEmpty()) {
+         final List<String> flatteneExcludes = new 
ArrayList<>(policy.getExcludes().size() * 2);
+         policy.getExcludes().forEach((entry) -> {
+            flatteneExcludes.add(entry.getKey());
+            flatteneExcludes.add(entry.getValue());
+         });
+
+         policyMap.put(QUEUE_EXCLUDES, flatteneExcludes);
+      }
+
+      if (!policy.getProperties().isEmpty()) {
+         policyMap.put(POLICY_PROPERTIES_MAP, policy.getProperties());
+      }
+
+      if (policy.getTransformerConfiguration() != null) {
+         final TransformerConfiguration config = 
policy.getTransformerConfiguration();
+
+         policyMap.put(TRANSFORMER_CLASS_NAME, config.getClassName());
+         if (!config.getProperties().isEmpty()) {
+            policyMap.put(TRANSFORMER_PROPERTIES_MAP, config.getProperties());
+         }
+      }
+
+      try {
+         final EncoderImpl encoder = TLSEncode.getEncoder();
+         encoder.setByteBuffer(new NettyWritable(buffer));
+         encoder.writeObject(messageAnnotations);
+         encoder.writeObject(sectionBody);
+
+         final byte[] data = new byte[buffer.writerIndex()];
+         buffer.readBytes(data);
+
+         return new AMQPStandardMessage(0, data, null);
+      } finally {
+         TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+         buffer.release();
+      }
+   }
+
+   /**
+    * Create an AMQP Message used to instruct the remote peer that it should 
perform
+    * Federation operations on the given {@link 
FederationReceiveFromAddressPolicy}.
+    *
+    * @param policy
+    *    The policy to encode into an AMQP message.
+    *
+    * @return an AMQP Message with the encoded policy.
+    */
+   public static AMQPMessage 
encodeAddressPolicyControlMessage(FederationReceiveFromAddressPolicy policy) {
+      final Map<Symbol, Object> annotations = new LinkedHashMap<>();
+      final MessageAnnotations messageAnnotations = new 
MessageAnnotations(annotations);
+      final Map<String, Object> policyMap = new LinkedHashMap<>();
+      final Section sectionBody = new AmqpValue(policyMap);
+      final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+
+      annotations.put(OPERATION_TYPE, ADD_ADDRESS_POLICY);
+
+      policyMap.put(POLICY_NAME, policy.getPolicyName());
+      policyMap.put(ADDRESS_AUTO_DELETE, policy.isAutoDelete());
+      policyMap.put(ADDRESS_AUTO_DELETE_DELAY, policy.getAutoDeleteDelay());
+      policyMap.put(ADDRESS_AUTO_DELETE_MSG_COUNT, 
policy.getAutoDeleteMessageCount());
+      policyMap.put(ADDRESS_MAX_HOPS, policy.getMaxHops());
+      policyMap.put(ADDRESS_ENABLE_DIVERT_BINDINGS, 
policy.isEnableDivertBindings());
+      if (!policy.getIncludes().isEmpty()) {
+         policyMap.put(ADDRESS_INCLUDES, new 
ArrayList<>(policy.getIncludes()));
+      }
+      if (!policy.getExcludes().isEmpty()) {
+         policyMap.put(ADDRESS_EXCLUDES, new 
ArrayList<>(policy.getExcludes()));
+      }
+
+      if (!policy.getProperties().isEmpty()) {
+         policyMap.put(POLICY_PROPERTIES_MAP, policy.getProperties());
+      }
+
+      if (policy.getTransformerConfiguration() != null) {
+         final TransformerConfiguration config = 
policy.getTransformerConfiguration();
+
+         policyMap.put(TRANSFORMER_CLASS_NAME, config.getClassName());
+         if (!config.getProperties().isEmpty()) {
+            policyMap.put(TRANSFORMER_PROPERTIES_MAP, config.getProperties());
+         }
+      }
+
+      try {
+         final EncoderImpl encoder = TLSEncode.getEncoder();
+         encoder.setByteBuffer(new NettyWritable(buffer));
+         encoder.writeObject(messageAnnotations);
+         encoder.writeObject(sectionBody);
+
+         final byte[] data = new byte[buffer.writerIndex()];
+         buffer.readBytes(data);
+
+         return new AMQPStandardMessage(0, data, null);
+      } finally {
+         TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+         buffer.release();
+      }
+   }
+
+   /**
+    * Given an AMQP Message decode an {@link FederationReceiveFromQueuePolicy} 
from it and return
+    * the decoded value. The message should have already been inspected and 
determined to be an
+    * control message of the add to policy type.
+    *
+    * @param message
+    *    The {@link AMQPMessage} that should carry an encoded {@link 
FederationReceiveFromQueuePolicy}
+    * @param wildcardConfig
+    *    The {@link WildcardConfiguration} to use in the decoded policy.
+    *
+    * @return a decoded {@link FederationReceiveFromQueuePolicy} instance.
+    *
+    * @throws ActiveMQException if an error occurs while decoding the policy.
+    */
+   @SuppressWarnings("unchecked")
+   public static FederationReceiveFromQueuePolicy 
decodeReceiveFromQueuePolicy(AMQPMessage message, WildcardConfiguration 
wildcardConfig) throws ActiveMQException {
+      final Section body = message.getBody();
+
+      if (!(body instanceof AmqpValue)) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Message body was not an AmqpValue type");
+      }
+
+      final AmqpValue bodyValue = (AmqpValue) body;
+
+      if (bodyValue.getValue() == null || !(bodyValue.getValue() instanceof 
Map)) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Message body AmqpValue did not carry an encoded Map");
+      }
+
+      try {
+         final Map<String, Object> policyMap = (Map<String, Object>) 
bodyValue.getValue();
+
+         if (!policyMap.containsKey(POLICY_NAME)) {
+            throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+               "Message body did not carry the required policy name");
+         }
+
+         final String policyName = (String) policyMap.get(POLICY_NAME);
+         final boolean includeFederated = (boolean) 
policyMap.getOrDefault(QUEUE_INCLUDE_FEDERATED, false);
+         final int priorityAdjustment = ((Number) 
policyMap.getOrDefault(QUEUE_PRIORITY_ADJUSTMENT, 
DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT)).intValue();
+         final Set<Map.Entry<String, String>> includes = 
decodeFlattenedFilterSet(policyMap, QUEUE_INCLUDES);
+         final Set<Map.Entry<String, String>> excludes = 
decodeFlattenedFilterSet(policyMap, QUEUE_EXCLUDES);
+         final TransformerConfiguration transformerConfig;
+
+         if (policyMap.containsKey(TRANSFORMER_CLASS_NAME)) {
+            transformerConfig = new TransformerConfiguration();
+            transformerConfig.setClassName((String) 
policyMap.get(TRANSFORMER_CLASS_NAME));
+            transformerConfig.setProperties((Map<String, String>) 
policyMap.get(TRANSFORMER_PROPERTIES_MAP));
+         } else {
+            transformerConfig = null;
+         }
+
+         final Map<String, Object> properties;
+
+         if (policyMap.containsKey(POLICY_PROPERTIES_MAP)) {
+            properties = (Map<String, Object>) 
policyMap.get(POLICY_PROPERTIES_MAP);
+         } else {
+            properties = null;
+         }
+
+         return new FederationReceiveFromQueuePolicy(policyName, 
includeFederated, priorityAdjustment,
+                                                     includes, excludes, 
properties, transformerConfig,
+                                                     wildcardConfig);
+      } catch (ActiveMQException amqEx) {
+         throw amqEx;
+      } catch (Exception e) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Invalid encoded queue policy entry: " + e.getMessage());
+      }
+   }
+
+   @SuppressWarnings("unchecked")
+   private static Set<Map.Entry<String, String>> 
decodeFlattenedFilterSet(Map<String, Object> policyMap, String target) throws 
ActiveMQException {
+      final Object encodedObject = policyMap.get(target);
+
+      if (encodedObject == null) {
+         return Collections.EMPTY_SET;
+      }
+
+      if (!(encodedObject instanceof List)) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Encoded queue policy entry was not the expected List type : " + 
target);
+      }
+
+      final Set<Map.Entry<String, String>> policyEntrySet;
+
+      try {
+         final List<String> flattenedEntrySet = (List<String>) encodedObject;
+
+         if (flattenedEntrySet.isEmpty()) {
+            return Collections.EMPTY_SET;
+         }
+
+         if ((flattenedEntrySet.size() & 1) != 0) {
+            throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+               "Encoded queue policy entry was must contain an even number of 
elements : " + target);
+         }
+
+         policyEntrySet = new HashSet<>(Math.max(2, flattenedEntrySet.size() / 
2));
+
+         for (int i = 0; i < flattenedEntrySet.size(); ) {
+            policyEntrySet.add(new SimpleEntry<>(flattenedEntrySet.get(i++), 
flattenedEntrySet.get(i++)));
+         }
+
+      } catch (ActiveMQException amqEx) {
+         throw amqEx;
+      } catch (Exception e) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Invalid encoded queue policy entry: " + e.getMessage());
+      }
+
+      return policyEntrySet;
+   }
+
+   /**
+    * Given an AMQP Message decode an {@link 
FederationReceiveFromAddressPolicy} from it and return
+    * the decoded value. The message should have already been inspected and 
determined to be an
+    * control message of the add to policy type.
+    *
+    * @param message
+    *    The {@link AMQPMessage} that should carry an encoded {@link 
FederationReceiveFromQueuePolicy}
+    * @param wildcardConfig
+    *    The {@link WildcardConfiguration} to use in the decoded policy.
+    *
+    * @return a decoded {@link FederationReceiveFromAddressPolicy} instance.
+    *
+    * @throws ActiveMQException if an error occurs during the policy decode.
+    */
+   @SuppressWarnings("unchecked")
+   public static FederationReceiveFromAddressPolicy 
decodeReceiveFromAddressPolicy(AMQPMessage message, WildcardConfiguration 
wildcardConfig) throws ActiveMQException {
+      final Section body = message.getBody();
+
+      if (!(body instanceof AmqpValue)) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Message body was not an AmqpValue type");
+      }
+
+      final AmqpValue bodyValue = (AmqpValue) body;
+
+      if (bodyValue.getValue() == null || !(bodyValue.getValue() instanceof 
Map)) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Message body AmqpValue did not carry an encoded Map");
+      }
+
+      try {
+         final Map<String, Object> policyMap = (Map<String, Object>) 
bodyValue.getValue();
+
+         if (!policyMap.containsKey(POLICY_NAME)) {
+            throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+               "Message body did not carry the required policy name");
+         }
+
+         if (!policyMap.containsKey(ADDRESS_MAX_HOPS)) {
+            throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+               "Message body did not carry the required max hops 
configuration");
+         }
+
+         final String policyName = (String) policyMap.get(POLICY_NAME);
+         final boolean autoDelete = (Boolean) 
policyMap.getOrDefault(ADDRESS_AUTO_DELETE, false);
+         final long autoDeleteDelay = ((Number) 
policyMap.getOrDefault(ADDRESS_AUTO_DELETE_DELAY, 0L)).longValue();
+         final long autoDeleteMsgCount = ((Number) 
policyMap.getOrDefault(ADDRESS_AUTO_DELETE_MSG_COUNT, 0L)).longValue();
+         final int maxHops = ((Number) 
policyMap.get(ADDRESS_MAX_HOPS)).intValue();
+         final boolean enableDiverts = (Boolean) 
policyMap.getOrDefault(ADDRESS_ENABLE_DIVERT_BINDINGS, false);
+
+         final Set<String> includes;
+         final Set<String> excludes;
+
+         if (policyMap.containsKey(ADDRESS_INCLUDES)) {
+            includes = (Set<String>) new 
HashSet<>((List<String>)policyMap.get(ADDRESS_INCLUDES));
+         } else {
+            includes = Collections.EMPTY_SET;
+         }
+
+         if (policyMap.containsKey(ADDRESS_EXCLUDES)) {
+            excludes = (Set<String>) new 
HashSet<>((List<String>)policyMap.get(ADDRESS_EXCLUDES));
+         } else {
+            excludes = Collections.EMPTY_SET;
+         }
+
+         final TransformerConfiguration transformerConfig;
+
+         if (policyMap.containsKey(TRANSFORMER_CLASS_NAME)) {
+            transformerConfig = new TransformerConfiguration();
+            transformerConfig.setClassName((String) 
policyMap.get(TRANSFORMER_CLASS_NAME));
+            transformerConfig.setProperties((Map<String, String>) 
policyMap.get(TRANSFORMER_PROPERTIES_MAP));
+         } else {
+            transformerConfig = null;
+         }
+
+         final Map<String, Object> properties;
+
+         if (policyMap.containsKey(POLICY_PROPERTIES_MAP)) {
+            properties = (Map<String, Object>) 
policyMap.get(POLICY_PROPERTIES_MAP);
+         } else {
+            properties = null;
+         }
+
+         return new FederationReceiveFromAddressPolicy(policyName, autoDelete, 
autoDeleteDelay,
+                                                       autoDeleteMsgCount, 
maxHops, enableDiverts,
+                                                       includes, excludes, 
properties, transformerConfig,
+                                                       wildcardConfig);
+      } catch (Exception e) {
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+            "Invalid encoded address policy entry: " + e.getMessage());
+      }
+   }
+
+   /**
+    * From the broker AMQP broker connection configuration element and the 
configured wild-card
+    * settings create an address match policy.
+    *
+    * @param element
+    *    The broker connections element configuration that creates this policy.
+    * @param wildcards
+    *    The configured wild-card settings for the broker or defaults.
+    *
+    * @return a new address match and handling policy for use in the broker 
connection.
+    */
+   @SuppressWarnings("unchecked")
+   public static FederationReceiveFromAddressPolicy 
create(AMQPFederationAddressPolicyElement element, WildcardConfiguration 
wildcards) {
+      final Set<String> includes;
+      final Set<String> excludes;
+
+      if (element.getIncludes() != null && !element.getIncludes().isEmpty()) {
+         includes = new HashSet<>(element.getIncludes().size());
+
+         element.getIncludes().forEach(addressMatch -> 
includes.add(addressMatch.getAddressMatch()));
+      } else {
+         includes = Collections.EMPTY_SET;
+      }
+
+      if (element.getExcludes() != null && !element.getIncludes().isEmpty()) {

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to