gemmellr commented on code in PR #4605:
URL: https://github.com/apache/activemq-artemis/pull/4605#discussion_r1318823508
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java:
##########
@@ -239,7 +271,44 @@ SimpleString
getMirrorSNF(AMQPMirrorBrokerConnectionElement mirrorElement) {
return snf;
}
+ /**
+ * Adds a remote link closed event intercepter that can intercept the
closed event and if it
+ * returns true indicate that the close has been handled and that normal
broker connection
+ * remote link closed handling should be ignored.
+ *
+ * @param id
+ * A unique Id value that identifies the intercepter for later removal.
+ * @param intercepter
+ * The predicate that will be called for any link close.
+ *
+ * @return this broker connection instance.
+ */
+ public AMQPBrokerConnection addLinkClosedIntercepter(String id,
Predicate<Link> intercepter) {
Review Comment:
Intercepter -> Interceptor for consistency with other areas?
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupport.java:
##########
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.connect.federation;
+
+import java.util.AbstractMap;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.config.TransformerConfiguration;
+import org.apache.activemq.artemis.core.config.WildcardConfiguration;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
+import
org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
+import
org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy;
+import
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.WritableBuffer;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.OPERATION_TYPE;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.POLICY_NAME;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_EXCLUDES;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_INCLUDES;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_INCLUDE_FEDERATED;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_PRIORITY_ADJUSTMENT;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.TRANSFORMER_CLASS_NAME;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.TRANSFORMER_PROPERTIES_MAP;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.POLICY_PROPERTIES_MAP;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADD_QUEUE_POLICY;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_DELAY;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_MSG_COUNT;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_ENABLE_DIVERT_BINDINGS;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_EXCLUDES;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_INCLUDES;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_MAX_HOPS;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADD_ADDRESS_POLICY;
+
+/**
+ * Tools used when loading AMQP Broker connections configuration that includes
Federation
+ * configuration.
+ */
+public final class AMQPFederationPolicySupport {
+
+ /**
+ * Default priority adjustment used for a federation queue match policy if
nothing
+ * was configured in the broker configuration file.
+ */
+ public static final int DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT = -1;
+
+ /**
+ * Annotation added to received messages from address consumers that
indicates how many
+ * times the message has traversed a federation link.
+ */
+ public static final Symbol MESSAGE_HOPS_ANNOTATION =
Symbol.valueOf("x-opt-amq-fed-hops");
+
+ /**
+ * Property name used to embed a nested map of properties meant to be
applied if the address
+ * indicated in an federation address receiver auto creates the federated
address.
+ */
+ public static final Symbol FEDERATED_ADDRESS_SOURCE_PROPERTIES =
Symbol.valueOf("federated-address-source-properties");
+
+ /**
+ * Create an AMQP Message used to instruct the remote peer that it should
perform
+ * Federation operations on the given {@link
FederationReceiveFromQueuePolicy}.
+ *
+ * @param policy
+ * The policy to encode into an AMQP message.
+ *
+ * @return an AMQP Message with the encoded policy.
+ */
+ public static AMQPMessage
encodeQueuePolicyControlMessage(FederationReceiveFromQueuePolicy policy) {
+ final Map<Symbol, Object> annotations = new LinkedHashMap<>();
+ final MessageAnnotations messageAnnotations = new
MessageAnnotations(annotations);
+ final Map<String, Object> policyMap = new LinkedHashMap<>();
+ final Section sectionBody = new AmqpValue(policyMap);
+ final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+
+ annotations.put(OPERATION_TYPE, ADD_QUEUE_POLICY);
+
+ policyMap.put(POLICY_NAME, policy.getPolicyName());
+ policyMap.put(QUEUE_INCLUDE_FEDERATED, policy.isIncludeFederated());
+ policyMap.put(QUEUE_PRIORITY_ADJUSTMENT, policy.getPriorityAjustment());
+
+ if (!policy.getIncludes().isEmpty()) {
+ final List<String> flattenedIncludes = new
ArrayList<>(policy.getIncludes().size() * 2);
+ policy.getIncludes().forEach((entry) -> {
+ flattenedIncludes.add(entry.getKey());
+ flattenedIncludes.add(entry.getValue());
+ });
+
+ policyMap.put(QUEUE_INCLUDES, flattenedIncludes);
+ }
+
+ if (!policy.getExcludes().isEmpty()) {
+ final List<String> flatteneExcludes = new
ArrayList<>(policy.getExcludes().size() * 2);
+ policy.getExcludes().forEach((entry) -> {
+ flatteneExcludes.add(entry.getKey());
+ flatteneExcludes.add(entry.getValue());
+ });
+
+ policyMap.put(QUEUE_EXCLUDES, flatteneExcludes);
+ }
+
+ if (!policy.getProperties().isEmpty()) {
+ policyMap.put(POLICY_PROPERTIES_MAP, policy.getProperties());
+ }
+
+ if (policy.getTransformerConfiguration() != null) {
+ final TransformerConfiguration config =
policy.getTransformerConfiguration();
+
+ policyMap.put(TRANSFORMER_CLASS_NAME, config.getClassName());
+ if (!config.getProperties().isEmpty()) {
+ policyMap.put(TRANSFORMER_PROPERTIES_MAP, config.getProperties());
+ }
+ }
+
+ try {
+ final EncoderImpl encoder = TLSEncode.getEncoder();
+ encoder.setByteBuffer(new NettyWritable(buffer));
+ encoder.writeObject(messageAnnotations);
+ encoder.writeObject(sectionBody);
+
+ final byte[] data = new byte[buffer.writerIndex()];
+ buffer.readBytes(data);
+
+ return new AMQPStandardMessage(0, data, null);
+ } finally {
+ TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+ buffer.release();
+ }
+ }
+
+ /**
+ * Create an AMQP Message used to instruct the remote peer that it should
perform
+ * Federation operations on the given {@link
FederationReceiveFromAddressPolicy}.
+ *
+ * @param policy
+ * The policy to encode into an AMQP message.
+ *
+ * @return an AMQP Message with the encoded policy.
+ */
+ public static AMQPMessage
encodeAddressPolicyControlMessage(FederationReceiveFromAddressPolicy policy) {
+ final Map<Symbol, Object> annotations = new LinkedHashMap<>();
+ final MessageAnnotations messageAnnotations = new
MessageAnnotations(annotations);
+ final Map<String, Object> policyMap = new LinkedHashMap<>();
+ final Section sectionBody = new AmqpValue(policyMap);
+ final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+
+ annotations.put(OPERATION_TYPE, ADD_ADDRESS_POLICY);
+
+ policyMap.put(POLICY_NAME, policy.getPolicyName());
+ policyMap.put(ADDRESS_AUTO_DELETE, policy.isAutoDelete());
+ policyMap.put(ADDRESS_AUTO_DELETE_DELAY, policy.getAutoDeleteDelay());
+ policyMap.put(ADDRESS_AUTO_DELETE_MSG_COUNT,
policy.getAutoDeleteMessageCount());
+ policyMap.put(ADDRESS_MAX_HOPS, policy.getMaxHops());
+ policyMap.put(ADDRESS_ENABLE_DIVERT_BINDINGS,
policy.isEnableDivertBindings());
+ if (!policy.getIncludes().isEmpty()) {
+ policyMap.put(ADDRESS_INCLUDES, new
ArrayList<>(policy.getIncludes()));
+ }
+ if (!policy.getExcludes().isEmpty()) {
+ policyMap.put(ADDRESS_EXCLUDES, new
ArrayList<>(policy.getExcludes()));
+ }
+
+ if (!policy.getProperties().isEmpty()) {
+ policyMap.put(POLICY_PROPERTIES_MAP, policy.getProperties());
+ }
+
+ if (policy.getTransformerConfiguration() != null) {
+ final TransformerConfiguration config =
policy.getTransformerConfiguration();
+
+ policyMap.put(TRANSFORMER_CLASS_NAME, config.getClassName());
+ if (!config.getProperties().isEmpty()) {
+ policyMap.put(TRANSFORMER_PROPERTIES_MAP, config.getProperties());
+ }
+ }
+
+ try {
+ final EncoderImpl encoder = TLSEncode.getEncoder();
+ encoder.setByteBuffer(new NettyWritable(buffer));
+ encoder.writeObject(messageAnnotations);
+ encoder.writeObject(sectionBody);
+
+ final byte[] data = new byte[buffer.writerIndex()];
+ buffer.readBytes(data);
+
+ return new AMQPStandardMessage(0, data, null);
+ } finally {
+ TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+ buffer.release();
+ }
+ }
+
+ /**
+ * Given an AMQP Message decode an {@link FederationReceiveFromQueuePolicy}
from it and return
+ * the decoded value. The message should have already been inspected and
determined to be an
+ * control message of the add to policy type.
+ *
+ * @param message
+ * The {@link AMQPMessage} that should carry an encoded {@link
FederationReceiveFromQueuePolicy}
+ * @param wildcardConfig
+ * The {@link WildcardConfiguration} to use in the decoded policy.
+ *
+ * @return a decoded {@link FederationReceiveFromQueuePolicy} instance.
+ *
+ * @throws ActiveMQException if an error occurs while decoding the policy.
+ */
+ @SuppressWarnings("unchecked")
+ public static FederationReceiveFromQueuePolicy
decodeReceiveFromQueuePolicy(AMQPMessage message, WildcardConfiguration
wildcardConfig) throws ActiveMQException {
+ final Section body = message.getBody();
+
+ if (!(body instanceof AmqpValue)) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Message body was not an AmqpValue type");
+ }
+
+ final AmqpValue bodyValue = (AmqpValue) body;
+
+ if (bodyValue.getValue() == null || !(bodyValue.getValue() instanceof
Map)) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Message body AmqpValue did not carry an encoded Map");
+ }
+
+ try {
+ final Map<String, Object> policyMap = (Map<String, Object>)
bodyValue.getValue();
+
+ if (!policyMap.containsKey(POLICY_NAME)) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Message body did not carry the required policy name");
+ }
+
+ final String policyName = (String) policyMap.get(POLICY_NAME);
+ final boolean includeFederated = (boolean)
policyMap.getOrDefault(QUEUE_INCLUDE_FEDERATED, false);
+ final int priorityAdjustment = ((Number)
policyMap.getOrDefault(QUEUE_PRIORITY_ADJUSTMENT,
DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT)).intValue();
+ final Set<Map.Entry<String, String>> includes =
decodeFlattenedFilterSet(policyMap, QUEUE_INCLUDES);
+ final Set<Map.Entry<String, String>> excludes =
decodeFlattenedFilterSet(policyMap, QUEUE_EXCLUDES);
+ final TransformerConfiguration transformerConfig;
+
+ if (policyMap.containsKey(TRANSFORMER_CLASS_NAME)) {
+ transformerConfig = new TransformerConfiguration();
+ transformerConfig.setClassName((String)
policyMap.get(TRANSFORMER_CLASS_NAME));
+ transformerConfig.setProperties((Map<String, String>)
policyMap.get(TRANSFORMER_PROPERTIES_MAP));
+ } else {
+ transformerConfig = null;
+ }
+
+ final Map<String, Object> properties;
+
+ if (policyMap.containsKey(POLICY_PROPERTIES_MAP)) {
+ properties = (Map<String, Object>)
policyMap.get(POLICY_PROPERTIES_MAP);
+ } else {
+ properties = null;
+ }
+
+ return new FederationReceiveFromQueuePolicy(policyName,
includeFederated, priorityAdjustment,
+ includes, excludes,
properties, transformerConfig,
+ wildcardConfig);
+ } catch (ActiveMQException amqEx) {
+ throw amqEx;
+ } catch (Exception e) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Invalid encoded queue policy entry: " + e.getMessage());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Set<Map.Entry<String, String>>
decodeFlattenedFilterSet(Map<String, Object> policyMap, String target) throws
ActiveMQException {
+ final Object encodedObject = policyMap.get(target);
+
+ if (encodedObject == null) {
+ return Collections.EMPTY_SET;
+ }
+
+ if (!(encodedObject instanceof List)) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Encoded queue policy entry was not the expected List type : " +
target);
+ }
+
+ final Set<Map.Entry<String, String>> policyEntrySet;
+
+ try {
+ final List<String> flattenedEntrySet = (List<String>) encodedObject;
+
+ if (flattenedEntrySet.isEmpty()) {
+ return Collections.EMPTY_SET;
+ }
+
+ if ((flattenedEntrySet.size() & 1) != 0) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Encoded queue policy entry was must contain an even number of
elements : " + target);
+ }
+
+ policyEntrySet = new HashSet<>(Math.max(2, flattenedEntrySet.size() /
2));
+
+ for (int i = 0; i < flattenedEntrySet.size(); ) {
+ policyEntrySet.add(new SimpleEntry<>(flattenedEntrySet.get(i++),
flattenedEntrySet.get(i++)));
+ }
+
+ } catch (ActiveMQException amqEx) {
+ throw amqEx;
+ } catch (Exception e) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Invalid encoded queue policy entry: " + e.getMessage());
+ }
+
+ return policyEntrySet;
+ }
+
+ /**
+ * Given an AMQP Message decode an {@link
FederationReceiveFromAddressPolicy} from it and return
+ * the decoded value. The message should have already been inspected and
determined to be an
+ * control message of the add to policy type.
+ *
+ * @param message
+ * The {@link AMQPMessage} that should carry an encoded {@link
FederationReceiveFromQueuePolicy}
+ * @param wildcardConfig
+ * The {@link WildcardConfiguration} to use in the decoded policy.
+ *
+ * @return a decoded {@link FederationReceiveFromAddressPolicy} instance.
+ *
+ * @throws ActiveMQException if an error occurs during the policy decode.
+ */
+ @SuppressWarnings("unchecked")
+ public static FederationReceiveFromAddressPolicy
decodeReceiveFromAddressPolicy(AMQPMessage message, WildcardConfiguration
wildcardConfig) throws ActiveMQException {
+ final Section body = message.getBody();
+
+ if (!(body instanceof AmqpValue)) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Message body was not an AmqpValue type");
+ }
+
+ final AmqpValue bodyValue = (AmqpValue) body;
+
+ if (bodyValue.getValue() == null || !(bodyValue.getValue() instanceof
Map)) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Message body AmqpValue did not carry an encoded Map");
+ }
+
+ try {
+ final Map<String, Object> policyMap = (Map<String, Object>)
bodyValue.getValue();
+
+ if (!policyMap.containsKey(POLICY_NAME)) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Message body did not carry the required policy name");
+ }
+
+ if (!policyMap.containsKey(ADDRESS_MAX_HOPS)) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Message body did not carry the required max hops
configuration");
+ }
+
+ final String policyName = (String) policyMap.get(POLICY_NAME);
+ final boolean autoDelete = (Boolean)
policyMap.getOrDefault(ADDRESS_AUTO_DELETE, false);
+ final long autoDeleteDelay = ((Number)
policyMap.getOrDefault(ADDRESS_AUTO_DELETE_DELAY, 0L)).longValue();
+ final long autoDeleteMsgCount = ((Number)
policyMap.getOrDefault(ADDRESS_AUTO_DELETE_MSG_COUNT, 0L)).longValue();
+ final int maxHops = ((Number)
policyMap.get(ADDRESS_MAX_HOPS)).intValue();
+ final boolean enableDiverts = (Boolean)
policyMap.getOrDefault(ADDRESS_ENABLE_DIVERT_BINDINGS, false);
+
+ final Set<String> includes;
+ final Set<String> excludes;
+
+ if (policyMap.containsKey(ADDRESS_INCLUDES)) {
+ includes = (Set<String>) new
HashSet<>((List<String>)policyMap.get(ADDRESS_INCLUDES));
+ } else {
+ includes = Collections.EMPTY_SET;
+ }
+
+ if (policyMap.containsKey(ADDRESS_EXCLUDES)) {
+ excludes = (Set<String>) new
HashSet<>((List<String>)policyMap.get(ADDRESS_EXCLUDES));
+ } else {
+ excludes = Collections.EMPTY_SET;
+ }
+
+ final TransformerConfiguration transformerConfig;
+
+ if (policyMap.containsKey(TRANSFORMER_CLASS_NAME)) {
+ transformerConfig = new TransformerConfiguration();
+ transformerConfig.setClassName((String)
policyMap.get(TRANSFORMER_CLASS_NAME));
+ transformerConfig.setProperties((Map<String, String>)
policyMap.get(TRANSFORMER_PROPERTIES_MAP));
+ } else {
+ transformerConfig = null;
+ }
+
+ final Map<String, Object> properties;
+
+ if (policyMap.containsKey(POLICY_PROPERTIES_MAP)) {
+ properties = (Map<String, Object>)
policyMap.get(POLICY_PROPERTIES_MAP);
+ } else {
+ properties = null;
+ }
+
+ return new FederationReceiveFromAddressPolicy(policyName, autoDelete,
autoDeleteDelay,
+ autoDeleteMsgCount,
maxHops, enableDiverts,
+ includes, excludes,
properties, transformerConfig,
+ wildcardConfig);
+ } catch (Exception e) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Invalid encoded address policy entry: " + e.getMessage());
+ }
+ }
+
+ /**
+ * From the broker AMQP broker connection configuration element and the
configured wild-card
+ * settings create an address match policy.
+ *
+ * @param element
+ * The broker connections element configuration that creates this policy.
+ * @param wildcards
+ * The configured wild-card settings for the broker or defaults.
+ *
+ * @return a new address match and handling policy for use in the broker
connection.
+ */
+ @SuppressWarnings("unchecked")
+ public static FederationReceiveFromAddressPolicy
create(AMQPFederationAddressPolicyElement element, WildcardConfiguration
wildcards) {
+ final Set<String> includes;
+ final Set<String> excludes;
+
+ if (element.getIncludes() != null && !element.getIncludes().isEmpty()) {
+ includes = new HashSet<>(element.getIncludes().size());
+
+ element.getIncludes().forEach(addressMatch ->
includes.add(addressMatch.getAddressMatch()));
+ } else {
+ includes = Collections.EMPTY_SET;
+ }
+
+ if (element.getExcludes() != null && !element.getIncludes().isEmpty()) {
Review Comment:
c&p error? null-checks excludes but then empty-checks includes.
##########
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/config/FederationConfiguration.java:
##########
@@ -74,7 +74,7 @@ public Map<String, FederationPolicy> getQueuePolicies() {
}
// strange spelling!, it allows a type match for singular of correct plural
Policies from properties
- public FederationConfiguration
addQueuePolicie(FederationQueuePolicyConfiguration federationPolicy) {
+ public FederationConfiguration
addQueuePolicy(FederationQueuePolicyConfiguration federationPolicy) {
Review Comment:
Comment above method is inaccurate after the name changes.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupport.java:
##########
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.connect.federation;
+
+import java.util.AbstractMap;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.config.TransformerConfiguration;
+import org.apache.activemq.artemis.core.config.WildcardConfiguration;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
+import
org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
+import
org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy;
+import
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.WritableBuffer;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.OPERATION_TYPE;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.POLICY_NAME;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_EXCLUDES;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_INCLUDES;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_INCLUDE_FEDERATED;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_PRIORITY_ADJUSTMENT;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.TRANSFORMER_CLASS_NAME;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.TRANSFORMER_PROPERTIES_MAP;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.POLICY_PROPERTIES_MAP;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADD_QUEUE_POLICY;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_DELAY;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_MSG_COUNT;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_ENABLE_DIVERT_BINDINGS;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_EXCLUDES;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_INCLUDES;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_MAX_HOPS;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADD_ADDRESS_POLICY;
+
+/**
+ * Tools used when loading AMQP Broker connections configuration that includes
Federation
+ * configuration.
+ */
+public final class AMQPFederationPolicySupport {
+
+ /**
+ * Default priority adjustment used for a federation queue match policy if
nothing
+ * was configured in the broker configuration file.
+ */
+ public static final int DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT = -1;
+
+ /**
+ * Annotation added to received messages from address consumers that
indicates how many
+ * times the message has traversed a federation link.
+ */
+ public static final Symbol MESSAGE_HOPS_ANNOTATION =
Symbol.valueOf("x-opt-amq-fed-hops");
+
+ /**
+ * Property name used to embed a nested map of properties meant to be
applied if the address
+ * indicated in an federation address receiver auto creates the federated
address.
+ */
+ public static final Symbol FEDERATED_ADDRESS_SOURCE_PROPERTIES =
Symbol.valueOf("federated-address-source-properties");
+
+ /**
+ * Create an AMQP Message used to instruct the remote peer that it should
perform
+ * Federation operations on the given {@link
FederationReceiveFromQueuePolicy}.
+ *
+ * @param policy
+ * The policy to encode into an AMQP message.
+ *
+ * @return an AMQP Message with the encoded policy.
+ */
+ public static AMQPMessage
encodeQueuePolicyControlMessage(FederationReceiveFromQueuePolicy policy) {
+ final Map<Symbol, Object> annotations = new LinkedHashMap<>();
+ final MessageAnnotations messageAnnotations = new
MessageAnnotations(annotations);
+ final Map<String, Object> policyMap = new LinkedHashMap<>();
+ final Section sectionBody = new AmqpValue(policyMap);
+ final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+
+ annotations.put(OPERATION_TYPE, ADD_QUEUE_POLICY);
+
+ policyMap.put(POLICY_NAME, policy.getPolicyName());
+ policyMap.put(QUEUE_INCLUDE_FEDERATED, policy.isIncludeFederated());
+ policyMap.put(QUEUE_PRIORITY_ADJUSTMENT, policy.getPriorityAjustment());
+
+ if (!policy.getIncludes().isEmpty()) {
+ final List<String> flattenedIncludes = new
ArrayList<>(policy.getIncludes().size() * 2);
+ policy.getIncludes().forEach((entry) -> {
+ flattenedIncludes.add(entry.getKey());
+ flattenedIncludes.add(entry.getValue());
+ });
+
+ policyMap.put(QUEUE_INCLUDES, flattenedIncludes);
+ }
+
+ if (!policy.getExcludes().isEmpty()) {
+ final List<String> flatteneExcludes = new
ArrayList<>(policy.getExcludes().size() * 2);
+ policy.getExcludes().forEach((entry) -> {
+ flatteneExcludes.add(entry.getKey());
+ flatteneExcludes.add(entry.getValue());
+ });
+
+ policyMap.put(QUEUE_EXCLUDES, flatteneExcludes);
+ }
+
+ if (!policy.getProperties().isEmpty()) {
+ policyMap.put(POLICY_PROPERTIES_MAP, policy.getProperties());
+ }
+
+ if (policy.getTransformerConfiguration() != null) {
+ final TransformerConfiguration config =
policy.getTransformerConfiguration();
+
+ policyMap.put(TRANSFORMER_CLASS_NAME, config.getClassName());
+ if (!config.getProperties().isEmpty()) {
+ policyMap.put(TRANSFORMER_PROPERTIES_MAP, config.getProperties());
+ }
+ }
+
+ try {
+ final EncoderImpl encoder = TLSEncode.getEncoder();
+ encoder.setByteBuffer(new NettyWritable(buffer));
+ encoder.writeObject(messageAnnotations);
+ encoder.writeObject(sectionBody);
+
+ final byte[] data = new byte[buffer.writerIndex()];
+ buffer.readBytes(data);
+
+ return new AMQPStandardMessage(0, data, null);
+ } finally {
+ TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+ buffer.release();
+ }
+ }
+
+ /**
+ * Create an AMQP Message used to instruct the remote peer that it should
perform
+ * Federation operations on the given {@link
FederationReceiveFromAddressPolicy}.
+ *
+ * @param policy
+ * The policy to encode into an AMQP message.
+ *
+ * @return an AMQP Message with the encoded policy.
+ */
+ public static AMQPMessage
encodeAddressPolicyControlMessage(FederationReceiveFromAddressPolicy policy) {
+ final Map<Symbol, Object> annotations = new LinkedHashMap<>();
+ final MessageAnnotations messageAnnotations = new
MessageAnnotations(annotations);
+ final Map<String, Object> policyMap = new LinkedHashMap<>();
+ final Section sectionBody = new AmqpValue(policyMap);
+ final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+
+ annotations.put(OPERATION_TYPE, ADD_ADDRESS_POLICY);
+
+ policyMap.put(POLICY_NAME, policy.getPolicyName());
+ policyMap.put(ADDRESS_AUTO_DELETE, policy.isAutoDelete());
+ policyMap.put(ADDRESS_AUTO_DELETE_DELAY, policy.getAutoDeleteDelay());
+ policyMap.put(ADDRESS_AUTO_DELETE_MSG_COUNT,
policy.getAutoDeleteMessageCount());
+ policyMap.put(ADDRESS_MAX_HOPS, policy.getMaxHops());
+ policyMap.put(ADDRESS_ENABLE_DIVERT_BINDINGS,
policy.isEnableDivertBindings());
+ if (!policy.getIncludes().isEmpty()) {
+ policyMap.put(ADDRESS_INCLUDES, new
ArrayList<>(policy.getIncludes()));
+ }
+ if (!policy.getExcludes().isEmpty()) {
+ policyMap.put(ADDRESS_EXCLUDES, new
ArrayList<>(policy.getExcludes()));
+ }
+
+ if (!policy.getProperties().isEmpty()) {
+ policyMap.put(POLICY_PROPERTIES_MAP, policy.getProperties());
+ }
+
+ if (policy.getTransformerConfiguration() != null) {
+ final TransformerConfiguration config =
policy.getTransformerConfiguration();
+
+ policyMap.put(TRANSFORMER_CLASS_NAME, config.getClassName());
+ if (!config.getProperties().isEmpty()) {
+ policyMap.put(TRANSFORMER_PROPERTIES_MAP, config.getProperties());
+ }
+ }
+
+ try {
+ final EncoderImpl encoder = TLSEncode.getEncoder();
+ encoder.setByteBuffer(new NettyWritable(buffer));
+ encoder.writeObject(messageAnnotations);
+ encoder.writeObject(sectionBody);
+
+ final byte[] data = new byte[buffer.writerIndex()];
+ buffer.readBytes(data);
+
+ return new AMQPStandardMessage(0, data, null);
+ } finally {
+ TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+ buffer.release();
+ }
+ }
+
+ /**
+ * Given an AMQP Message decode an {@link FederationReceiveFromQueuePolicy}
from it and return
+ * the decoded value. The message should have already been inspected and
determined to be an
+ * control message of the add to policy type.
+ *
+ * @param message
+ * The {@link AMQPMessage} that should carry an encoded {@link
FederationReceiveFromQueuePolicy}
+ * @param wildcardConfig
+ * The {@link WildcardConfiguration} to use in the decoded policy.
+ *
+ * @return a decoded {@link FederationReceiveFromQueuePolicy} instance.
+ *
+ * @throws ActiveMQException if an error occurs while decoding the policy.
+ */
+ @SuppressWarnings("unchecked")
+ public static FederationReceiveFromQueuePolicy
decodeReceiveFromQueuePolicy(AMQPMessage message, WildcardConfiguration
wildcardConfig) throws ActiveMQException {
+ final Section body = message.getBody();
+
+ if (!(body instanceof AmqpValue)) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Message body was not an AmqpValue type");
+ }
+
+ final AmqpValue bodyValue = (AmqpValue) body;
+
+ if (bodyValue.getValue() == null || !(bodyValue.getValue() instanceof
Map)) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Message body AmqpValue did not carry an encoded Map");
+ }
+
+ try {
+ final Map<String, Object> policyMap = (Map<String, Object>)
bodyValue.getValue();
+
+ if (!policyMap.containsKey(POLICY_NAME)) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Message body did not carry the required policy name");
+ }
+
+ final String policyName = (String) policyMap.get(POLICY_NAME);
+ final boolean includeFederated = (boolean)
policyMap.getOrDefault(QUEUE_INCLUDE_FEDERATED, false);
+ final int priorityAdjustment = ((Number)
policyMap.getOrDefault(QUEUE_PRIORITY_ADJUSTMENT,
DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT)).intValue();
+ final Set<Map.Entry<String, String>> includes =
decodeFlattenedFilterSet(policyMap, QUEUE_INCLUDES);
+ final Set<Map.Entry<String, String>> excludes =
decodeFlattenedFilterSet(policyMap, QUEUE_EXCLUDES);
+ final TransformerConfiguration transformerConfig;
+
+ if (policyMap.containsKey(TRANSFORMER_CLASS_NAME)) {
+ transformerConfig = new TransformerConfiguration();
+ transformerConfig.setClassName((String)
policyMap.get(TRANSFORMER_CLASS_NAME));
+ transformerConfig.setProperties((Map<String, String>)
policyMap.get(TRANSFORMER_PROPERTIES_MAP));
+ } else {
+ transformerConfig = null;
+ }
+
+ final Map<String, Object> properties;
+
+ if (policyMap.containsKey(POLICY_PROPERTIES_MAP)) {
+ properties = (Map<String, Object>)
policyMap.get(POLICY_PROPERTIES_MAP);
+ } else {
+ properties = null;
+ }
+
+ return new FederationReceiveFromQueuePolicy(policyName,
includeFederated, priorityAdjustment,
+ includes, excludes,
properties, transformerConfig,
+ wildcardConfig);
+ } catch (ActiveMQException amqEx) {
+ throw amqEx;
+ } catch (Exception e) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Invalid encoded queue policy entry: " + e.getMessage());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Set<Map.Entry<String, String>>
decodeFlattenedFilterSet(Map<String, Object> policyMap, String target) throws
ActiveMQException {
+ final Object encodedObject = policyMap.get(target);
+
+ if (encodedObject == null) {
+ return Collections.EMPTY_SET;
+ }
+
+ if (!(encodedObject instanceof List)) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Encoded queue policy entry was not the expected List type : " +
target);
+ }
+
+ final Set<Map.Entry<String, String>> policyEntrySet;
+
+ try {
+ final List<String> flattenedEntrySet = (List<String>) encodedObject;
+
+ if (flattenedEntrySet.isEmpty()) {
+ return Collections.EMPTY_SET;
+ }
+
+ if ((flattenedEntrySet.size() & 1) != 0) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Encoded queue policy entry was must contain an even number of
elements : " + target);
+ }
+
+ policyEntrySet = new HashSet<>(Math.max(2, flattenedEntrySet.size() /
2));
+
+ for (int i = 0; i < flattenedEntrySet.size(); ) {
+ policyEntrySet.add(new SimpleEntry<>(flattenedEntrySet.get(i++),
flattenedEntrySet.get(i++)));
+ }
+
+ } catch (ActiveMQException amqEx) {
+ throw amqEx;
+ } catch (Exception e) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Invalid encoded queue policy entry: " + e.getMessage());
+ }
+
+ return policyEntrySet;
+ }
+
+ /**
+ * Given an AMQP Message decode an {@link
FederationReceiveFromAddressPolicy} from it and return
+ * the decoded value. The message should have already been inspected and
determined to be an
+ * control message of the add to policy type.
+ *
+ * @param message
+ * The {@link AMQPMessage} that should carry an encoded {@link
FederationReceiveFromQueuePolicy}
+ * @param wildcardConfig
+ * The {@link WildcardConfiguration} to use in the decoded policy.
+ *
+ * @return a decoded {@link FederationReceiveFromAddressPolicy} instance.
+ *
+ * @throws ActiveMQException if an error occurs during the policy decode.
+ */
+ @SuppressWarnings("unchecked")
+ public static FederationReceiveFromAddressPolicy
decodeReceiveFromAddressPolicy(AMQPMessage message, WildcardConfiguration
wildcardConfig) throws ActiveMQException {
+ final Section body = message.getBody();
+
+ if (!(body instanceof AmqpValue)) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Message body was not an AmqpValue type");
+ }
+
+ final AmqpValue bodyValue = (AmqpValue) body;
+
+ if (bodyValue.getValue() == null || !(bodyValue.getValue() instanceof
Map)) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Message body AmqpValue did not carry an encoded Map");
+ }
+
+ try {
+ final Map<String, Object> policyMap = (Map<String, Object>)
bodyValue.getValue();
+
+ if (!policyMap.containsKey(POLICY_NAME)) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Message body did not carry the required policy name");
+ }
+
+ if (!policyMap.containsKey(ADDRESS_MAX_HOPS)) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Message body did not carry the required max hops
configuration");
+ }
+
+ final String policyName = (String) policyMap.get(POLICY_NAME);
+ final boolean autoDelete = (Boolean)
policyMap.getOrDefault(ADDRESS_AUTO_DELETE, false);
+ final long autoDeleteDelay = ((Number)
policyMap.getOrDefault(ADDRESS_AUTO_DELETE_DELAY, 0L)).longValue();
+ final long autoDeleteMsgCount = ((Number)
policyMap.getOrDefault(ADDRESS_AUTO_DELETE_MSG_COUNT, 0L)).longValue();
+ final int maxHops = ((Number)
policyMap.get(ADDRESS_MAX_HOPS)).intValue();
+ final boolean enableDiverts = (Boolean)
policyMap.getOrDefault(ADDRESS_ENABLE_DIVERT_BINDINGS, false);
+
+ final Set<String> includes;
+ final Set<String> excludes;
+
+ if (policyMap.containsKey(ADDRESS_INCLUDES)) {
+ includes = (Set<String>) new
HashSet<>((List<String>)policyMap.get(ADDRESS_INCLUDES));
+ } else {
+ includes = Collections.EMPTY_SET;
+ }
+
+ if (policyMap.containsKey(ADDRESS_EXCLUDES)) {
+ excludes = (Set<String>) new
HashSet<>((List<String>)policyMap.get(ADDRESS_EXCLUDES));
+ } else {
+ excludes = Collections.EMPTY_SET;
+ }
+
+ final TransformerConfiguration transformerConfig;
+
+ if (policyMap.containsKey(TRANSFORMER_CLASS_NAME)) {
+ transformerConfig = new TransformerConfiguration();
+ transformerConfig.setClassName((String)
policyMap.get(TRANSFORMER_CLASS_NAME));
+ transformerConfig.setProperties((Map<String, String>)
policyMap.get(TRANSFORMER_PROPERTIES_MAP));
+ } else {
+ transformerConfig = null;
+ }
+
+ final Map<String, Object> properties;
+
+ if (policyMap.containsKey(POLICY_PROPERTIES_MAP)) {
+ properties = (Map<String, Object>)
policyMap.get(POLICY_PROPERTIES_MAP);
+ } else {
+ properties = null;
+ }
+
+ return new FederationReceiveFromAddressPolicy(policyName, autoDelete,
autoDeleteDelay,
+ autoDeleteMsgCount,
maxHops, enableDiverts,
+ includes, excludes,
properties, transformerConfig,
+ wildcardConfig);
+ } catch (Exception e) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Invalid encoded address policy entry: " + e.getMessage());
+ }
+ }
+
+ /**
+ * From the broker AMQP broker connection configuration element and the
configured wild-card
+ * settings create an address match policy.
+ *
+ * @param element
+ * The broker connections element configuration that creates this policy.
+ * @param wildcards
+ * The configured wild-card settings for the broker or defaults.
+ *
+ * @return a new address match and handling policy for use in the broker
connection.
+ */
+ @SuppressWarnings("unchecked")
+ public static FederationReceiveFromAddressPolicy
create(AMQPFederationAddressPolicyElement element, WildcardConfiguration
wildcards) {
+ final Set<String> includes;
+ final Set<String> excludes;
+
+ if (element.getIncludes() != null && !element.getIncludes().isEmpty()) {
+ includes = new HashSet<>(element.getIncludes().size());
+
+ element.getIncludes().forEach(addressMatch ->
includes.add(addressMatch.getAddressMatch()));
+ } else {
+ includes = Collections.EMPTY_SET;
+ }
+
+ if (element.getExcludes() != null && !element.getIncludes().isEmpty()) {
+ excludes = new HashSet<>(element.getExcludes().size());
+
+ element.getExcludes().forEach(addressMatch ->
excludes.add(addressMatch.getAddressMatch()));
+ } else {
+ excludes = Collections.EMPTY_SET;
+ }
+
+ // We translate from broker configuration to actual implementation to
avoid any coupling here
+ // as broker configuration could change and or be updated.
+
+ final FederationReceiveFromAddressPolicy policy = new
FederationReceiveFromAddressPolicy(
+ element.getName(),
+ element.getAutoDelete() == null ? false : element.getAutoDelete(),
+ element.getAutoDeleteDelay() == null ? 0 :
element.getAutoDeleteDelay(),
+ element.getAutoDeleteMessageCount() == null ? 0 :
element.getAutoDeleteMessageCount(),
+ element.getMaxHops(),
+ element.isEnableDivertBindings() == null ? false :
element.isEnableDivertBindings(),
+ includes,
+ excludes,
+ element.getProperties(),
+ element.getTransformerConfiguration(),
+ wildcards);
+
+ return policy;
+ }
+
+ /**
+ * From the broker AMQP broker connection configuration element and the
configured wild-card
+ * settings create an queue match policy. If not configured otherwise the
policy is always
+ * defaulted to a value of <code>-1</code> in order to attempt to prevent
federation consumers
+ * from consuming messages on the remote when a local consumer is present.
+ *
+ * @param element
+ * The broker connections element configuration that creates this policy.
+ * @param wildcards
+ * The configured wild-card settings for the broker or defaults.
+ *
+ * @return a new queue match and handling policy for use in the broker
connection.
+ */
+ @SuppressWarnings("unchecked")
+ public static FederationReceiveFromQueuePolicy
create(AMQPFederationQueuePolicyElement element, WildcardConfiguration
wildcards) {
+ final Set<Map.Entry<String, String>> includes;
+ final Set<Map.Entry<String, String>> excludes;
+
+ if (element.getIncludes() != null && !element.getIncludes().isEmpty()) {
+ includes = new HashSet<>(element.getIncludes().size());
+
+ element.getIncludes().forEach(queueMatch ->
+ includes.add(new AbstractMap.SimpleImmutableEntry<String,
String>(queueMatch.getAddressMatch(), queueMatch.getQueueMatch())));
+ } else {
+ includes = Collections.EMPTY_SET;
+ }
+
+ if (element.getExcludes() != null && !element.getIncludes().isEmpty()) {
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]