gemmellr commented on code in PR #4817:
URL: https://github.com/apache/activemq-artemis/pull/4817#discussion_r1486082922
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java:
##########
@@ -294,14 +315,232 @@ protected boolean interceptLinkClosedEvent(Link link) {
return false;
}
+ private void asyncCreateTargetEventsSender(AMQPFederationCommandDispatcher
commandLink) {
+ // If no remote policies configured then we don't need an events sender
link
+ // currently, if some other use is added for this link this code must be
+ // removed and tests updated to expect this link to always be created.
+ if (remoteAddressMatchPolicies.isEmpty() &&
remoteQueueMatchPolicies.isEmpty()) {
+ return;
+ }
+
+ // Schedule the outgoing event link creation on the connection event
loop thread.
+ //
+ // Eventual establishment of the outgoing events link or refusal informs
this side
+ // of the connection as to whether the remote side supports receiving
events for
+ // resources that it attempted to federate but they did not exist at the
time and
+ // were subsequently added or for resources that might have been later
removed via
+ // management and then subsequently re-added.
+ //
+ // Once the outcome of the event link is known then send any remote
address or queue
+ // federation policies so that the remote can start federation of local
addresses or
+ // queues to itself. This ordering prevents and race on creation of the
events link
+ // and any federation consumer creation from the remote.
+ connection.runLater(() -> {
+ if (!isStarted()) {
+ return;
+ }
+
+ try {
+ final Sender sender = session.getSession().sender(
+ "Federation-events-sender:" + getName() + ":" +
UUIDGenerator.getInstance().generateStringUUID());
+ final Target target = new Target();
+ final Source source = new Source();
+
+ target.setDynamic(true);
+ target.setCapabilities(new Symbol[]
{Symbol.valueOf("temporary-topic")});
Review Comment:
AmqpSupport.TEMP_TOPIC_CAPABILITY
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java:
##########
@@ -294,14 +315,232 @@ protected boolean interceptLinkClosedEvent(Link link) {
return false;
}
+ private void asyncCreateTargetEventsSender(AMQPFederationCommandDispatcher
commandLink) {
+ // If no remote policies configured then we don't need an events sender
link
+ // currently, if some other use is added for this link this code must be
+ // removed and tests updated to expect this link to always be created.
+ if (remoteAddressMatchPolicies.isEmpty() &&
remoteQueueMatchPolicies.isEmpty()) {
+ return;
+ }
+
+ // Schedule the outgoing event link creation on the connection event
loop thread.
+ //
+ // Eventual establishment of the outgoing events link or refusal informs
this side
+ // of the connection as to whether the remote side supports receiving
events for
+ // resources that it attempted to federate but they did not exist at the
time and
+ // were subsequently added or for resources that might have been later
removed via
+ // management and then subsequently re-added.
+ //
+ // Once the outcome of the event link is known then send any remote
address or queue
+ // federation policies so that the remote can start federation of local
addresses or
+ // queues to itself. This ordering prevents and race on creation of the
events link
+ // and any federation consumer creation from the remote.
+ connection.runLater(() -> {
+ if (!isStarted()) {
+ return;
+ }
+
+ try {
+ final Sender sender = session.getSession().sender(
+ "Federation-events-sender:" + getName() + ":" +
UUIDGenerator.getInstance().generateStringUUID());
+ final Target target = new Target();
+ final Source source = new Source();
+
+ target.setDynamic(true);
+ target.setCapabilities(new Symbol[]
{Symbol.valueOf("temporary-topic")});
+ target.setDurable(TerminusDurability.NONE);
+ target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+ // Set the dynamic node lifetime-policy to indicate this needs to
be destroyed on close
+ // we don't want event links nodes remaining once a federation
connection is closed.
+ final Map<Symbol, Object> dynamicNodeProperties = new HashMap<>();
+ dynamicNodeProperties.put(AmqpSupport.LIFETIME_POLICY,
DeleteOnClose.getInstance());
+ target.setDynamicNodeProperties(dynamicNodeProperties);
+
+ sender.setSenderSettleMode(SenderSettleMode.SETTLED);
+ sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+ sender.setDesiredCapabilities(EVENT_LINK_CAPABILITIES);
+ sender.setTarget(target);
+ sender.setSource(source);
+ sender.open();
+
+ final ScheduledFuture<?> futureTimeout;
+ final AtomicBoolean cancelled = new AtomicBoolean(false);
+
+ if (brokerConnection.getConnectionTimeout() > 0) {
+ futureTimeout =
brokerConnection.getServer().getScheduledPool().schedule(() -> {
+ cancelled.set(true);
+
brokerConnection.connectError(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionTimeout());
+ }, brokerConnection.getConnectionTimeout(),
TimeUnit.MILLISECONDS);
+ } else {
+ futureTimeout = null;
+ }
+
+ // Using attachments to set up a Runnable that will be executed
inside the remote link opened handler
+ sender.attachments().set(AMQP_LINK_INITIALIZER_KEY,
Runnable.class, () -> {
+ try {
+ if (cancelled.get()) {
+ return;
+ }
+
+ if (futureTimeout != null) {
+ futureTimeout.cancel(false);
+ }
+
+ if (sender.getRemoteTarget() == null ||
!AmqpSupport.verifyOfferedCapabilities(sender)) {
+ // Sender rejected or not an event link endpoint so close
as we will
+ // not support sending events to the remote but otherwise
will operate
+ // as normal.
+ sender.close();
+ } else {
+ session.addFederationEventDispatcher(sender);
+ }
+
+ // Once we know whether the events support is active or not
we can send
+ // the remote federation policies and allow the remote
federation links
+ // to start forming.
+
+ remoteQueueMatchPolicies.forEach((key, policy) -> {
+ try {
+ commandLink.sendPolicy(policy);
+ } catch (Exception e) {
+ brokerConnection.error(e);
+ }
+ });
+
+ remoteAddressMatchPolicies.forEach((key, policy) -> {
+ try {
+ commandLink.sendPolicy(policy);
+ } catch (Exception e) {
+ brokerConnection.error(e);
+ }
+ });
+
+ } catch (Exception e) {
+ brokerConnection.error(e);
+ }
+ });
+ } catch (Exception e) {
+ brokerConnection.error(e);
+ }
+
+ connection.flush();
+ });
+ }
+
+ private void asnycCreateTargetEventsReceiver() {
+ // If no local policies configured then we don't need an events receiver
link
+ // currently, if some other use is added for this link this code must be
+ // removed and tests updated to expect this link to always be created.
+ if (addressMatchPolicies.isEmpty() && queueMatchPolicies.isEmpty()) {
+ return;
+ }
+
+ // Schedule the incoming event link creation on the connection event
loop thread.
+ //
+ // Eventual establishment of the incoming event link or refusal informs
this side
+ // of the connection as to whether the remote will send events for
addresses or
+ // queues that were not present when a federation consumer attempt had
failed and
+ // were later added or an existing federation consumer was closed due to
management
+ // action and those resource are once again available for federation.
+ //
+ // Once the outcome of the event link is known then start all the policy
managers
+ // which will start federation from remote addresses and queues to this
broker.
+ // This ordering prevents any races around the events receiver creation
and creation
+ // of federation consumers on the remote.
+ connection.runLater(() -> {
+ if (!isStarted()) {
+ return;
+ }
+
+ try {
+ final Receiver receiver = session.getSession().receiver(
+ "Federation-events-receiver:" + getName() + ":" +
UUIDGenerator.getInstance().generateStringUUID());
+
+ final Target target = new Target();
+ final Source source = new Source();
+
+ source.setDynamic(true);
+ source.setCapabilities(new Symbol[]
{Symbol.valueOf("temporary-topic")});
Review Comment:
AmqpSupport.TEMP_TOPIC_CAPABILITY
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationEventSupport.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.connect.federation;
+
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.EVENT_TYPE;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.REQUESTED_ADDRESS_ADDED;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.REQUESTED_ADDRESS_NAME;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.REQUESTED_QUEUE_ADDED;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.REQUESTED_QUEUE_NAME;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
+import
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.WritableBuffer;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+
+/**
+ * Tools used for sending and receiving events inside AMQP message instance.
+ */
+public final class AMQPFederationEventSupport {
+
+ /**
+ * Encode an event that indicates that a Queue that belongs to a federation
+ * request which was not present at the time of the request or was later
removed
+ * is now present and the remote should check for demand and attempt to
federate
+ * the resource once again.
+ *
+ * @param address
+ * The address that the queue is currently bound to.
+ * @param queue
+ * The queue that was part of a previous federation request.
+ *
+ * @return the AMQP message with the encoded event data.
+ */
+ public static AMQPMessage encodeQueueAddedEvent(String address, String
queue) {
+ final Map<Symbol, Object> annotations = new LinkedHashMap<>();
+ final MessageAnnotations messageAnnotations = new
MessageAnnotations(annotations);
+ final Map<String, Object> eventMap = new LinkedHashMap<>();
+ final Section sectionBody = new AmqpValue(eventMap);
+ final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+
+ annotations.put(EVENT_TYPE, REQUESTED_QUEUE_ADDED);
+
+ eventMap.put(REQUESTED_ADDRESS_NAME, address);
+ eventMap.put(REQUESTED_QUEUE_NAME, queue);
+
+ try {
+ final EncoderImpl encoder = TLSEncode.getEncoder();
+ encoder.setByteBuffer(new NettyWritable(buffer));
+ encoder.writeObject(messageAnnotations);
+ encoder.writeObject(sectionBody);
+
+ final byte[] data = new byte[buffer.writerIndex()];
+ buffer.readBytes(data);
+
+ return new AMQPStandardMessage(0, data, null);
+ } finally {
+ TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+ buffer.release();
+ }
+ }
+
+ /**
+ * Encode an event that indicates that an Address that belongs to a
federation
+ * request which was not present at the time of the request or was later
removed
+ * is now present and the remote should check for demand and attempt to
federate
+ * the resource once again.
+ *
+ * @param address
+ * The address portion of the previously failed federation request
+ *
+ * @return the AMQP message with the encoded event data.
+ */
+ public static AMQPMessage encodeAddressAddedEvent(String address) {
+ final Map<Symbol, Object> annotations = new LinkedHashMap<>();
+ final MessageAnnotations messageAnnotations = new
MessageAnnotations(annotations);
+ final Map<String, Object> eventMap = new LinkedHashMap<>();
+ final Section sectionBody = new AmqpValue(eventMap);
+ final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+
+ annotations.put(EVENT_TYPE, REQUESTED_ADDRESS_ADDED);
+
+ eventMap.put(REQUESTED_ADDRESS_NAME, address);
+
+ try {
+ final EncoderImpl encoder = TLSEncode.getEncoder();
+ encoder.setByteBuffer(new NettyWritable(buffer));
+ encoder.writeObject(messageAnnotations);
+ encoder.writeObject(sectionBody);
+
+ final byte[] data = new byte[buffer.writerIndex()];
+ buffer.readBytes(data);
+
+ return new AMQPStandardMessage(0, data, null);
+ } finally {
+ TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+ buffer.release();
+ }
+ }
+
+ /**
+ * Decode and return the Map containing the event data for a Queue that was
+ * the target of a previous federation request which was not present on the
+ * remote server or was later removed has now been (re)added.
+ *
+ * @param message
+ * The event message that carries the event data in its body.
+ *
+ * @return a {@link Map} containing the payload of the incoming event.
+ *
+ * @throws ActiveMQException if an error occurs while decoding the event
data.
+ */
+ @SuppressWarnings("unchecked")
+ public static Map<String, Object> decodeQueueAddedEvent(AMQPMessage
message) throws ActiveMQException {
+ final Section body = message.getBody();
+
+ if (!(body instanceof AmqpValue)) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Message body was not an AmqpValue type");
+ }
+
+ final AmqpValue bodyValue = (AmqpValue) body;
+
+ if (bodyValue.getValue() == null || !(bodyValue.getValue() instanceof
Map)) {
Review Comment:
superfluous null check
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java:
##########
@@ -294,14 +315,232 @@ protected boolean interceptLinkClosedEvent(Link link) {
return false;
}
+ private void asyncCreateTargetEventsSender(AMQPFederationCommandDispatcher
commandLink) {
+ // If no remote policies configured then we don't need an events sender
link
+ // currently, if some other use is added for this link this code must be
+ // removed and tests updated to expect this link to always be created.
+ if (remoteAddressMatchPolicies.isEmpty() &&
remoteQueueMatchPolicies.isEmpty()) {
+ return;
+ }
+
+ // Schedule the outgoing event link creation on the connection event
loop thread.
+ //
+ // Eventual establishment of the outgoing events link or refusal informs
this side
+ // of the connection as to whether the remote side supports receiving
events for
+ // resources that it attempted to federate but they did not exist at the
time and
+ // were subsequently added or for resources that might have been later
removed via
+ // management and then subsequently re-added.
+ //
+ // Once the outcome of the event link is known then send any remote
address or queue
+ // federation policies so that the remote can start federation of local
addresses or
+ // queues to itself. This ordering prevents and race on creation of the
events link
Review Comment:
prevents and race -> prevents a race ?
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationEventSupport.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.connect.federation;
+
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.EVENT_TYPE;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.REQUESTED_ADDRESS_ADDED;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.REQUESTED_ADDRESS_NAME;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.REQUESTED_QUEUE_ADDED;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.REQUESTED_QUEUE_NAME;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
+import
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.WritableBuffer;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+
+/**
+ * Tools used for sending and receiving events inside AMQP message instance.
+ */
+public final class AMQPFederationEventSupport {
+
+ /**
+ * Encode an event that indicates that a Queue that belongs to a federation
+ * request which was not present at the time of the request or was later
removed
+ * is now present and the remote should check for demand and attempt to
federate
+ * the resource once again.
+ *
+ * @param address
+ * The address that the queue is currently bound to.
+ * @param queue
+ * The queue that was part of a previous federation request.
+ *
+ * @return the AMQP message with the encoded event data.
+ */
+ public static AMQPMessage encodeQueueAddedEvent(String address, String
queue) {
+ final Map<Symbol, Object> annotations = new LinkedHashMap<>();
+ final MessageAnnotations messageAnnotations = new
MessageAnnotations(annotations);
+ final Map<String, Object> eventMap = new LinkedHashMap<>();
+ final Section sectionBody = new AmqpValue(eventMap);
+ final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+
+ annotations.put(EVENT_TYPE, REQUESTED_QUEUE_ADDED);
+
+ eventMap.put(REQUESTED_ADDRESS_NAME, address);
+ eventMap.put(REQUESTED_QUEUE_NAME, queue);
+
+ try {
+ final EncoderImpl encoder = TLSEncode.getEncoder();
+ encoder.setByteBuffer(new NettyWritable(buffer));
+ encoder.writeObject(messageAnnotations);
+ encoder.writeObject(sectionBody);
+
+ final byte[] data = new byte[buffer.writerIndex()];
+ buffer.readBytes(data);
+
+ return new AMQPStandardMessage(0, data, null);
+ } finally {
+ TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+ buffer.release();
+ }
+ }
+
+ /**
+ * Encode an event that indicates that an Address that belongs to a
federation
+ * request which was not present at the time of the request or was later
removed
+ * is now present and the remote should check for demand and attempt to
federate
+ * the resource once again.
+ *
+ * @param address
+ * The address portion of the previously failed federation request
+ *
+ * @return the AMQP message with the encoded event data.
+ */
+ public static AMQPMessage encodeAddressAddedEvent(String address) {
+ final Map<Symbol, Object> annotations = new LinkedHashMap<>();
+ final MessageAnnotations messageAnnotations = new
MessageAnnotations(annotations);
+ final Map<String, Object> eventMap = new LinkedHashMap<>();
+ final Section sectionBody = new AmqpValue(eventMap);
+ final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+
+ annotations.put(EVENT_TYPE, REQUESTED_ADDRESS_ADDED);
+
+ eventMap.put(REQUESTED_ADDRESS_NAME, address);
+
+ try {
+ final EncoderImpl encoder = TLSEncode.getEncoder();
+ encoder.setByteBuffer(new NettyWritable(buffer));
+ encoder.writeObject(messageAnnotations);
+ encoder.writeObject(sectionBody);
+
+ final byte[] data = new byte[buffer.writerIndex()];
+ buffer.readBytes(data);
+
+ return new AMQPStandardMessage(0, data, null);
+ } finally {
+ TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+ buffer.release();
+ }
+ }
+
+ /**
+ * Decode and return the Map containing the event data for a Queue that was
+ * the target of a previous federation request which was not present on the
+ * remote server or was later removed has now been (re)added.
+ *
+ * @param message
+ * The event message that carries the event data in its body.
+ *
+ * @return a {@link Map} containing the payload of the incoming event.
+ *
+ * @throws ActiveMQException if an error occurs while decoding the event
data.
+ */
+ @SuppressWarnings("unchecked")
+ public static Map<String, Object> decodeQueueAddedEvent(AMQPMessage
message) throws ActiveMQException {
+ final Section body = message.getBody();
+
+ if (!(body instanceof AmqpValue)) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Message body was not an AmqpValue type");
+ }
+
+ final AmqpValue bodyValue = (AmqpValue) body;
+
+ if (bodyValue.getValue() == null || !(bodyValue.getValue() instanceof
Map)) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Message body AmqpValue did not carry an encoded Map");
+ }
+
+ try {
+ final Map<String, Object> eventMap = (Map<String, Object>)
bodyValue.getValue();
+
+ if (!eventMap.containsKey(REQUESTED_ADDRESS_NAME)) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationEventMessage(
+ "Message body did not carry the required address name");
+ }
+
+ if (!eventMap.containsKey(REQUESTED_QUEUE_NAME)) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationEventMessage(
+ "Message body did not carry the required queue name");
+ }
+
+ return eventMap;
+ } catch (ActiveMQException amqEx) {
+ throw amqEx;
+ } catch (Exception e) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Invalid encoded queue added event entry: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Decode and return the Map containing the event data for an Address that
was
+ * the target of a previous federation request which was not present on the
+ * remote server or was later removed has now been (re)added.
+ *
+ * @param message
+ * The event message that carries the event data in its body.
+ *
+ * @return a {@link Map} containing the payload of the incoming event.
+ *
+ * @throws ActiveMQException if an error occurs while decoding the event
data.
+ */
+ @SuppressWarnings("unchecked")
+ public static Map<String, Object> decodeAddressAddedEvent(AMQPMessage
message) throws ActiveMQException {
+ final Section body = message.getBody();
+
+ if (!(body instanceof AmqpValue)) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
+ "Message body was not an AmqpValue type");
+ }
+
+ final AmqpValue bodyValue = (AmqpValue) body;
+
+ if (bodyValue.getValue() == null || !(bodyValue.getValue() instanceof
Map)) {
Review Comment:
Same
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]