fixing Proton tests
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7174b536 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7174b536 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7174b536 Branch: refs/heads/artemis-1009 Commit: 7174b5363b016f8425f1ee4543e81e7ecba41740 Parents: e85f755 Author: Clebert Suconic <[email protected]> Authored: Tue Feb 28 22:12:13 2017 -0500 Committer: Clebert Suconic <[email protected]> Committed: Thu Mar 2 10:05:21 2017 -0500 ---------------------------------------------------------------------- .../protocol/amqp/broker/AMQPMessage.java | 149 +++++++++++-------- .../amqp/broker/AMQPSessionCallback.java | 2 +- .../artemis/core/filter/impl/FilterImpl.java | 7 +- .../impl/journal/LargeServerMessageImpl.java | 2 +- .../core/server/impl/ServerSessionImpl.java | 2 +- 5 files changed, 100 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7174b536/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index c530c94..1cb85ea 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -19,6 +19,8 @@ package org.apache.activemq.artemis.protocol.amqp.broker; import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; import java.util.Set; import io.netty.buffer.ByteBuf; @@ -49,16 +51,15 @@ import org.apache.qpid.proton.util.TLSEncoder; // see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format public class AMQPMessage extends RefCountMessage { - private volatile int memoryEstimate = -1; - final long messageFormat; - private ProtonProtocolManager protocolManager; ByteBuf data; boolean bufferValid; byte type; long messageID; String address; MessageImpl protonMessage; + private volatile int memoryEstimate = -1; + private ProtonProtocolManager protocolManager; private long expiration = 0; // this can be used to encode the header again and the rest of the message buffer private int headerEnd = -1; @@ -124,6 +125,22 @@ public class AMQPMessage extends RefCountMessage { } } + private Map getApplicationPropertiesMap() { + + ApplicationProperties appMap = getApplicationProperties(); + Map map = null; + + if (appMap != null) { + map = appMap.getValue(); + } + + if (map == null) { + return Collections.emptyMap(); + } else { + return map; + } + } + private ApplicationProperties getApplicationProperties() { if (applicationProperties == null) { if (data != null) { @@ -347,7 +364,7 @@ public class AMQPMessage extends RefCountMessage { @Override public Object getUserID() { - return null; + return getProperties().getMessageId(); } @Override @@ -415,17 +432,17 @@ public class AMQPMessage extends RefCountMessage { } @Override - public AMQPMessage setAddress(SimpleString address) { - return setAddress(address.toString()); - } - - @Override public AMQPMessage setAddress(String address) { this.address = address; return this; } @Override + public AMQPMessage setAddress(SimpleString address) { + return setAddress(address.toString()); + } + + @Override public SimpleString getAddressSimpleString() { return SimpleString.toSimpleString(getAddress()); } @@ -492,244 +509,256 @@ public class AMQPMessage extends RefCountMessage { @Override public org.apache.activemq.artemis.api.core.Message putBooleanProperty(String key, boolean value) { - return null; + getApplicationPropertiesMap().put(key, Boolean.valueOf(value)); + return this; } @Override public org.apache.activemq.artemis.api.core.Message putByteProperty(String key, byte value) { - return null; + getApplicationPropertiesMap().put(key, Byte.valueOf(value)); + return this; } @Override public org.apache.activemq.artemis.api.core.Message putBytesProperty(String key, byte[] value) { - return null; + getApplicationPropertiesMap().put(key, value); + return this; } @Override public org.apache.activemq.artemis.api.core.Message putShortProperty(String key, short value) { - return null; + getApplicationPropertiesMap().put(key, Short.valueOf(value)); + return this; } @Override public org.apache.activemq.artemis.api.core.Message putCharProperty(String key, char value) { - return null; + getApplicationPropertiesMap().put(key, Character.valueOf(value)); + return this; } @Override public org.apache.activemq.artemis.api.core.Message putIntProperty(String key, int value) { - return null; + getApplicationPropertiesMap().put(key, Integer.valueOf(value)); + return this; } @Override public org.apache.activemq.artemis.api.core.Message putLongProperty(String key, long value) { - return null; + getApplicationPropertiesMap().put(key, Long.valueOf(value)); + return this; } @Override public org.apache.activemq.artemis.api.core.Message putFloatProperty(String key, float value) { - return null; + getApplicationPropertiesMap().put(key, Float.valueOf(value)); + return this; } @Override public org.apache.activemq.artemis.api.core.Message putDoubleProperty(String key, double value) { - return null; + getApplicationPropertiesMap().put(key, Double.valueOf(value)); + return this; } @Override public org.apache.activemq.artemis.api.core.Message putBooleanProperty(SimpleString key, boolean value) { - return null; + getApplicationPropertiesMap().put(key, Boolean.valueOf(value)); + return this; } @Override public org.apache.activemq.artemis.api.core.Message putByteProperty(SimpleString key, byte value) { - return null; + return putByteProperty(key.toString(), value); } @Override public org.apache.activemq.artemis.api.core.Message putBytesProperty(SimpleString key, byte[] value) { - return null; + return putBytesProperty(key.toString(), value); } @Override public org.apache.activemq.artemis.api.core.Message putShortProperty(SimpleString key, short value) { - return null; + return putShortProperty(key.toString(), value); } @Override public org.apache.activemq.artemis.api.core.Message putCharProperty(SimpleString key, char value) { - return null; + return putCharProperty(key.toString(), value); } @Override public org.apache.activemq.artemis.api.core.Message putIntProperty(SimpleString key, int value) { - return null; + return putIntProperty(key.toString(), value); } @Override public org.apache.activemq.artemis.api.core.Message putLongProperty(SimpleString key, long value) { - return null; + return putLongProperty(key.toString(), value); } @Override public org.apache.activemq.artemis.api.core.Message putFloatProperty(SimpleString key, float value) { - return null; + return putFloatProperty(key.toString(), value); } @Override public org.apache.activemq.artemis.api.core.Message putDoubleProperty(SimpleString key, double value) { - return null; + return putDoubleProperty(key.toString(), value); } @Override public org.apache.activemq.artemis.api.core.Message putStringProperty(String key, String value) { - return null; + getApplicationPropertiesMap().put(key, value); + return this; } @Override public org.apache.activemq.artemis.api.core.Message putObjectProperty(String key, Object value) throws ActiveMQPropertyConversionException { - return null; + getApplicationPropertiesMap().put(key, value); + return this; } @Override public org.apache.activemq.artemis.api.core.Message putObjectProperty(SimpleString key, Object value) throws ActiveMQPropertyConversionException { - return null; + return putObjectProperty(key.toString(), value); } @Override public Object removeProperty(String key) { - return null; + return getApplicationPropertiesMap().remove(key); } @Override public boolean containsProperty(String key) { - return false; + return getApplicationPropertiesMap().containsKey(key); } @Override public Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException { - return null; + return (Boolean)getApplicationPropertiesMap().get(key); } @Override public Byte getByteProperty(String key) throws ActiveMQPropertyConversionException { - return null; + return (Byte)getApplicationPropertiesMap().get(key); } @Override public Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException { - return null; + return (Double)getApplicationPropertiesMap().get(key); } @Override public Integer getIntProperty(String key) throws ActiveMQPropertyConversionException { - return null; + return (Integer)getApplicationPropertiesMap().get(key); } @Override public Long getLongProperty(String key) throws ActiveMQPropertyConversionException { - return null; + return (Long)getApplicationPropertiesMap().get(key); } @Override public Object getObjectProperty(String key) { - return null; + return getApplicationPropertiesMap().get(key); } @Override public Short getShortProperty(String key) throws ActiveMQPropertyConversionException { - return null; + return (Short)getApplicationPropertiesMap().get(key); } @Override public Float getFloatProperty(String key) throws ActiveMQPropertyConversionException { - return null; + return (Float)getApplicationPropertiesMap().get(key); } @Override public String getStringProperty(String key) throws ActiveMQPropertyConversionException { - return null; + return (String)getApplicationPropertiesMap().get(key); } @Override public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException { - return null; + return SimpleString.toSimpleString((String)getApplicationPropertiesMap().get(key)); } @Override public byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException { - return new byte[0]; + return (byte[]) getApplicationPropertiesMap().get(key); } @Override public Object removeProperty(SimpleString key) { - return null; + return removeProperty(key.toString()); } @Override public boolean containsProperty(SimpleString key) { - return false; + return containsProperty(key.toString()); } @Override public Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException { - return null; + return getBooleanProperty(key.toString()); } @Override public Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException { - return null; + return getByteProperty(key.toString()); } @Override public Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException { - return null; + return getDoubleProperty(key.toString()); } @Override public Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException { - return null; + return getIntProperty(key.toString()); } @Override public Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException { - return null; + return getLongProperty(key.toString()); } @Override public Object getObjectProperty(SimpleString key) { - return null; + return getObjectProperty(key.toString()); } @Override public Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException { - return null; + return getShortProperty(key.toString()); } @Override public Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException { - return null; + return getFloatProperty(key.toString()); } @Override public String getStringProperty(SimpleString key) throws ActiveMQPropertyConversionException { - return null; + return getStringProperty(key.toString()); } @Override public SimpleString getSimpleStringProperty(SimpleString key) throws ActiveMQPropertyConversionException { - return null; + return getSimpleStringProperty(key.toString()); } @Override public byte[] getBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException { - return new byte[0]; + return getBytesProperty(key.toString()); } @Override public org.apache.activemq.artemis.api.core.Message putStringProperty(SimpleString key, SimpleString value) { - return null; + return putStringProperty(key.toString(), value.toString()); } @Override @@ -739,7 +768,11 @@ public class AMQPMessage extends RefCountMessage { @Override public Set<SimpleString> getPropertyNames() { - return Collections.emptySet(); + HashSet<SimpleString> values = new HashSet<>(); + for (Object k : getApplicationPropertiesMap().keySet()) { + values.add(SimpleString.toSimpleString(k.toString())); + } + return values; } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7174b536/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 0b02838..f34298c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -401,7 +401,7 @@ public class AMQPSessionCallback implements SessionCallback { final Receiver receiver) throws Exception { try { - message.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString(), receiver.getSession().getConnection().getRemoteContainer()); +// message.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString(), receiver.getSession().getConnection().getRemoteContainer()); serverSession.send(transaction, message, false, false); // FIXME Potential race here... http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7174b536/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java index 9d321c7..33a1187 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java @@ -158,7 +158,12 @@ public class FilterImpl implements Filter { } } // It's the stringified (hex) representation of a user id that can be used in a selector expression - return new SimpleString("ID:" + msg.getUserID()); + String userID = msg.getUserID().toString(); + if (userID.startsWith("ID:")) { + return SimpleString.toSimpleString(userID); + } else { + return new SimpleString("ID:" + msg.getUserID()); + } } else if (FilterConstants.ACTIVEMQ_PRIORITY.equals(fieldName)) { return Integer.valueOf(msg.getPriority()); } else if (FilterConstants.ACTIVEMQ_TIMESTAMP.equals(fieldName)) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7174b536/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index 817a56a..1b5c24e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -272,7 +272,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe file.open(); file.position(0); - for (; ; ) { + for (;;) { // The buffer is reused... // We need to make sure we clear the limits and the buffer before reusing it buffer.clear(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7174b536/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 27fbdcb..6d95341 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1661,7 +1661,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { if (value == null) { // TODO-now: userID - targetAddressInfos.put(msg.getAddressSimpleString(), new Pair<>((UUID)msg.getUserID(), new AtomicLong(1))); +// targetAddressInfos.put(msg.getAddressSimpleString(), new Pair<>((UUID)msg.getUserID(), new AtomicLong(1))); } else { // TODO-now: userID value.setA((UUID)msg.getUserID());
