This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new a5712ccd PROTON-2616 Relax filters API to accept Map with object
entries
a5712ccd is described below
commit a5712ccd2370cc066ca4c11c3c1c17001d824fcd
Author: Timothy Bish <[email protected]>
AuthorDate: Tue Oct 4 18:37:20 2022 -0400
PROTON-2616 Relax filters API to accept Map with object entries
Allows for AMQP types or custom DescribedType entries in the map which
allows for definition of JMS AMQP style selectors or other custom
selector types based on the remote handling of selectors.
---
.../apache/qpid/protonj2/client/SourceOptions.java | 6 +-
.../qpid/protonj2/client/impl/ReceiverTest.java | 113 ++++++++++++++++++++-
.../qpid/protonj2/client/impl/SenderTest.java | 7 +-
3 files changed, 113 insertions(+), 13 deletions(-)
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SourceOptions.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SourceOptions.java
index 51def8aa..0ade3a1c 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SourceOptions.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SourceOptions.java
@@ -39,7 +39,7 @@ public final class SourceOptions extends
TerminusOptions<SourceOptions> implemen
private DistributionMode distributionMode;
private DeliveryState defaultOutcome;
private DeliveryState.Type[] outcomes = DEFAULT_OUTCOMES;
- private Map<String, String> filters;
+ private Map<String, Object> filters;
@Override
public SourceOptions clone() {
@@ -85,7 +85,7 @@ public final class SourceOptions extends
TerminusOptions<SourceOptions> implemen
/**
* @return the filters
*/
- public Map<String, String> filters() {
+ public Map<String, Object> filters() {
return filters;
}
@@ -94,7 +94,7 @@ public final class SourceOptions extends
TerminusOptions<SourceOptions> implemen
*
* @return this {@link SourceOptions} instance.
*/
- public SourceOptions filters(Map<String, String> filters) {
+ public SourceOptions filters(Map<String, Object> filters) {
this.filters = filters;
return self();
}
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
index d3fbae21..9c7adcc4 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
@@ -70,6 +70,9 @@ import
org.apache.qpid.protonj2.test.driver.codec.messaging.Modified;
import org.apache.qpid.protonj2.test.driver.codec.messaging.Released;
import org.apache.qpid.protonj2.test.driver.codec.messaging.TerminusDurability;
import
org.apache.qpid.protonj2.test.driver.codec.messaging.TerminusExpiryPolicy;
+import
org.apache.qpid.protonj2.test.driver.codec.primitives.UnknownDescribedType;
+import org.apache.qpid.protonj2.types.DescribedType;
+import org.apache.qpid.protonj2.types.UnsignedLong;
import org.apache.qpid.protonj2.types.messaging.AmqpValue;
import org.apache.qpid.protonj2.types.messaging.Data;
import org.apache.qpid.protonj2.types.messaging.Section;
@@ -2527,10 +2530,7 @@ public class ReceiverTest extends
ImperativeClientTestCase {
@Test
public void testCreateReceiverWithUserConfiguredSourceAndTargetOptions()
throws Exception {
- final Map<String, Object> filtersToObject = new HashMap<>();
- filtersToObject.put("x-opt-filter", "a = b");
-
- final Map<String, String> filters = new HashMap<>();
+ final Map<String, Object> filters = new HashMap<>();
filters.put("x-opt-filter", "a = b");
try (ProtonTestServer peer = new ProtonTestServer()) {
@@ -2545,7 +2545,110 @@ public class ReceiverTest extends
ImperativeClientTestCase {
.withExpiryPolicy(TerminusExpiryPolicy.CONNECTION_CLOSE)
.withDefaultOutcome(new Released())
.withCapabilities("QUEUE")
- .withFilter(filtersToObject)
+ .withFilter(filters)
+
.withOutcomes("amqp:accepted:list", "amqp:rejected:list")
+ .also()
+ .withTarget().withAddress(notNullValue())
+ .withCapabilities("QUEUE")
+
.withDurable(TerminusDurability.CONFIGURATION)
+
.withExpiryPolicy(TerminusExpiryPolicy.SESSION_END)
+ .withTimeout(42)
+ .withDynamic(anyOf(nullValue(),
equalTo(false)))
+
.withDynamicNodeProperties(nullValue())
+ .and().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.expectDetach().respond();
+ peer.expectEnd().respond();
+ peer.expectClose().respond();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ Connection connection = container.connect(remoteURI.getHost(),
remoteURI.getPort());
+ Session session = connection.openSession();
+ ReceiverOptions receiverOptions = new ReceiverOptions();
+
+ receiverOptions.sourceOptions().capabilities("QUEUE");
+
receiverOptions.sourceOptions().distributionMode(DistributionMode.COPY);
+ receiverOptions.sourceOptions().timeout(128);
+
receiverOptions.sourceOptions().durabilityMode(DurabilityMode.UNSETTLED_STATE);
+
receiverOptions.sourceOptions().expiryPolicy(ExpiryPolicy.CONNECTION_CLOSE);
+
receiverOptions.sourceOptions().defaultOutcome(DeliveryState.released());
+ receiverOptions.sourceOptions().filters(filters);
+
receiverOptions.sourceOptions().outcomes(DeliveryState.Type.ACCEPTED,
DeliveryState.Type.REJECTED);
+
+ receiverOptions.targetOptions().capabilities("QUEUE");
+
receiverOptions.targetOptions().durabilityMode(DurabilityMode.CONFIGURATION);
+
receiverOptions.targetOptions().expiryPolicy(ExpiryPolicy.SESSION_CLOSE);
+ receiverOptions.targetOptions().timeout(42);
+
+ Receiver receiver = session.openReceiver("test-queue",
receiverOptions).openFuture().get();
+
+ receiver.close();
+ session.close();
+ connection.close();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ private class AmqpJmsSelectorType implements DescribedType {
+
+ private final String selector;
+
+ public AmqpJmsSelectorType(String selector) {
+ this.selector = selector;
+ }
+
+ @Override
+ public Object getDescriptor() {
+ return UnsignedLong.valueOf(0x0000468C00000004L);
+ }
+
+ @Override
+ public Object getDescribed() {
+ return this.selector;
+ }
+
+ @Override
+ public String toString() {
+ return "AmqpJmsSelectorType{" + selector + "}";
+ }
+ }
+
+ private class PeerJmsSelectorType extends UnknownDescribedType {
+
+ public PeerJmsSelectorType(String selector) {
+
super(org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong.valueOf(0x0000468C00000004L),
selector);
+ }
+ }
+
+ @Test
+ public void
testCreateReceiverWithUserConfiguredSourceWithJMSStyleSelector() throws
Exception {
+ final DescribedType clientJmsSelector = new
AmqpJmsSelectorType("myProperty=42");
+ final Map<String, Object> filters = new HashMap<>();
+ filters.put("jms-selector", clientJmsSelector);
+
+ final PeerJmsSelectorType peerJmsSelector = new
PeerJmsSelectorType("myProperty=42");
+ final Map<String, Object> filtersAtPeer = new HashMap<>();
+ filtersAtPeer.put("jms-selector", peerJmsSelector);
+
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofReceiver()
+ .withSource().withAddress("test-queue")
+ .withDistributionMode("copy")
+ .withTimeout(128)
+
.withDurable(TerminusDurability.UNSETTLED_STATE)
+
.withExpiryPolicy(TerminusExpiryPolicy.CONNECTION_CLOSE)
+ .withDefaultOutcome(new Released())
+ .withCapabilities("QUEUE")
+ .withFilter(filtersAtPeer)
.withOutcomes("amqp:accepted:list", "amqp:rejected:list")
.also()
.withTarget().withAddress(notNullValue())
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java
index 40023e38..02f97dd6 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java
@@ -2253,10 +2253,7 @@ public class SenderTest extends ImperativeClientTestCase
{
@Test
public void testCreateSenderWithUserConfiguredSourceAndTargetOptions()
throws Exception {
- final Map<String, Object> filtersToObject = new HashMap<>();
- filtersToObject.put("x-opt-filter", "a = b");
-
- final Map<String, String> filters = new HashMap<>();
+ final Map<String, Object> filters = new HashMap<>();
filters.put("x-opt-filter", "a = b");
try (ProtonTestServer peer = new ProtonTestServer()) {
@@ -2271,7 +2268,7 @@ public class SenderTest extends ImperativeClientTestCase {
.withExpiryPolicy(TerminusExpiryPolicy.CONNECTION_CLOSE)
.withDefaultOutcome(new Released())
.withCapabilities("QUEUE")
- .withFilter(filtersToObject)
+ .withFilter(filters)
.withOutcomes("amqp:accepted:list", "amqp:rejected:list")
.also()
.withTarget().withAddress("test-queue")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]