This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new a5712ccd PROTON-2616 Relax filters API to accept Map with object 
entries
a5712ccd is described below

commit a5712ccd2370cc066ca4c11c3c1c17001d824fcd
Author: Timothy Bish <[email protected]>
AuthorDate: Tue Oct 4 18:37:20 2022 -0400

    PROTON-2616 Relax filters API to accept Map with object entries
    
    Allows for AMQP types or custom DescribedType entries in the map which
    allows for definition of JMS AMQP style selectors or other custom
    selector types based on the remote handling of selectors.
---
 .../apache/qpid/protonj2/client/SourceOptions.java |   6 +-
 .../qpid/protonj2/client/impl/ReceiverTest.java    | 113 ++++++++++++++++++++-
 .../qpid/protonj2/client/impl/SenderTest.java      |   7 +-
 3 files changed, 113 insertions(+), 13 deletions(-)

diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SourceOptions.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SourceOptions.java
index 51def8aa..0ade3a1c 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SourceOptions.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SourceOptions.java
@@ -39,7 +39,7 @@ public final class SourceOptions extends 
TerminusOptions<SourceOptions> implemen
     private DistributionMode distributionMode;
     private DeliveryState defaultOutcome;
     private DeliveryState.Type[] outcomes = DEFAULT_OUTCOMES;
-    private Map<String, String> filters;
+    private Map<String, Object> filters;
 
     @Override
     public SourceOptions clone() {
@@ -85,7 +85,7 @@ public final class SourceOptions extends 
TerminusOptions<SourceOptions> implemen
     /**
      * @return the filters
      */
-    public Map<String, String> filters() {
+    public Map<String, Object> filters() {
         return filters;
     }
 
@@ -94,7 +94,7 @@ public final class SourceOptions extends 
TerminusOptions<SourceOptions> implemen
      *
      * @return this {@link SourceOptions} instance.
      */
-    public SourceOptions filters(Map<String, String> filters) {
+    public SourceOptions filters(Map<String, Object> filters) {
         this.filters = filters;
         return self();
     }
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
index d3fbae21..9c7adcc4 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
@@ -70,6 +70,9 @@ import 
org.apache.qpid.protonj2.test.driver.codec.messaging.Modified;
 import org.apache.qpid.protonj2.test.driver.codec.messaging.Released;
 import org.apache.qpid.protonj2.test.driver.codec.messaging.TerminusDurability;
 import 
org.apache.qpid.protonj2.test.driver.codec.messaging.TerminusExpiryPolicy;
+import 
org.apache.qpid.protonj2.test.driver.codec.primitives.UnknownDescribedType;
+import org.apache.qpid.protonj2.types.DescribedType;
+import org.apache.qpid.protonj2.types.UnsignedLong;
 import org.apache.qpid.protonj2.types.messaging.AmqpValue;
 import org.apache.qpid.protonj2.types.messaging.Data;
 import org.apache.qpid.protonj2.types.messaging.Section;
@@ -2527,10 +2530,7 @@ public class ReceiverTest extends 
ImperativeClientTestCase {
 
     @Test
     public void testCreateReceiverWithUserConfiguredSourceAndTargetOptions() 
throws Exception {
-        final Map<String, Object> filtersToObject = new HashMap<>();
-        filtersToObject.put("x-opt-filter", "a = b");
-
-        final Map<String, String> filters = new HashMap<>();
+        final Map<String, Object> filters = new HashMap<>();
         filters.put("x-opt-filter", "a = b");
 
         try (ProtonTestServer peer = new ProtonTestServer()) {
@@ -2545,7 +2545,110 @@ public class ReceiverTest extends 
ImperativeClientTestCase {
                                             
.withExpiryPolicy(TerminusExpiryPolicy.CONNECTION_CLOSE)
                                             .withDefaultOutcome(new Released())
                                             .withCapabilities("QUEUE")
-                                            .withFilter(filtersToObject)
+                                            .withFilter(filters)
+                                            
.withOutcomes("amqp:accepted:list", "amqp:rejected:list")
+                                            .also()
+                               .withTarget().withAddress(notNullValue())
+                                            .withCapabilities("QUEUE")
+                                            
.withDurable(TerminusDurability.CONFIGURATION)
+                                            
.withExpiryPolicy(TerminusExpiryPolicy.SESSION_END)
+                                            .withTimeout(42)
+                                            .withDynamic(anyOf(nullValue(), 
equalTo(false)))
+                                            
.withDynamicNodeProperties(nullValue())
+                               .and().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.expectDetach().respond();
+            peer.expectEnd().respond();
+            peer.expectClose().respond();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            Connection connection = container.connect(remoteURI.getHost(), 
remoteURI.getPort());
+            Session session = connection.openSession();
+            ReceiverOptions receiverOptions = new ReceiverOptions();
+
+            receiverOptions.sourceOptions().capabilities("QUEUE");
+            
receiverOptions.sourceOptions().distributionMode(DistributionMode.COPY);
+            receiverOptions.sourceOptions().timeout(128);
+            
receiverOptions.sourceOptions().durabilityMode(DurabilityMode.UNSETTLED_STATE);
+            
receiverOptions.sourceOptions().expiryPolicy(ExpiryPolicy.CONNECTION_CLOSE);
+            
receiverOptions.sourceOptions().defaultOutcome(DeliveryState.released());
+            receiverOptions.sourceOptions().filters(filters);
+            
receiverOptions.sourceOptions().outcomes(DeliveryState.Type.ACCEPTED, 
DeliveryState.Type.REJECTED);
+
+            receiverOptions.targetOptions().capabilities("QUEUE");
+            
receiverOptions.targetOptions().durabilityMode(DurabilityMode.CONFIGURATION);
+            
receiverOptions.targetOptions().expiryPolicy(ExpiryPolicy.SESSION_CLOSE);
+            receiverOptions.targetOptions().timeout(42);
+
+            Receiver receiver = session.openReceiver("test-queue", 
receiverOptions).openFuture().get();
+
+            receiver.close();
+            session.close();
+            connection.close();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
+
+    private class AmqpJmsSelectorType implements DescribedType {
+
+        private final String selector;
+
+        public AmqpJmsSelectorType(String selector) {
+            this.selector = selector;
+        }
+
+        @Override
+        public Object getDescriptor() {
+            return UnsignedLong.valueOf(0x0000468C00000004L);
+        }
+
+        @Override
+        public Object getDescribed() {
+            return this.selector;
+        }
+
+        @Override
+        public String toString() {
+            return "AmqpJmsSelectorType{" + selector + "}";
+        }
+    }
+
+    private class PeerJmsSelectorType extends UnknownDescribedType {
+
+        public PeerJmsSelectorType(String selector) {
+            
super(org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong.valueOf(0x0000468C00000004L),
 selector);
+        }
+    }
+
+    @Test
+    public void 
testCreateReceiverWithUserConfiguredSourceWithJMSStyleSelector() throws 
Exception {
+        final DescribedType clientJmsSelector = new 
AmqpJmsSelectorType("myProperty=42");
+        final Map<String, Object> filters = new HashMap<>();
+        filters.put("jms-selector", clientJmsSelector);
+
+        final PeerJmsSelectorType peerJmsSelector = new 
PeerJmsSelectorType("myProperty=42");
+        final Map<String, Object> filtersAtPeer = new HashMap<>();
+        filtersAtPeer.put("jms-selector", peerJmsSelector);
+
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofReceiver()
+                               .withSource().withAddress("test-queue")
+                                            .withDistributionMode("copy")
+                                            .withTimeout(128)
+                                            
.withDurable(TerminusDurability.UNSETTLED_STATE)
+                                            
.withExpiryPolicy(TerminusExpiryPolicy.CONNECTION_CLOSE)
+                                            .withDefaultOutcome(new Released())
+                                            .withCapabilities("QUEUE")
+                                            .withFilter(filtersAtPeer)
                                             
.withOutcomes("amqp:accepted:list", "amqp:rejected:list")
                                             .also()
                                .withTarget().withAddress(notNullValue())
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java
index 40023e38..02f97dd6 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java
@@ -2253,10 +2253,7 @@ public class SenderTest extends ImperativeClientTestCase 
{
 
     @Test
     public void testCreateSenderWithUserConfiguredSourceAndTargetOptions() 
throws Exception {
-        final Map<String, Object> filtersToObject = new HashMap<>();
-        filtersToObject.put("x-opt-filter", "a = b");
-
-        final Map<String, String> filters = new HashMap<>();
+        final Map<String, Object> filters = new HashMap<>();
         filters.put("x-opt-filter", "a = b");
 
         try (ProtonTestServer peer = new ProtonTestServer()) {
@@ -2271,7 +2268,7 @@ public class SenderTest extends ImperativeClientTestCase {
                                             
.withExpiryPolicy(TerminusExpiryPolicy.CONNECTION_CLOSE)
                                             .withDefaultOutcome(new Released())
                                             .withCapabilities("QUEUE")
-                                            .withFilter(filtersToObject)
+                                            .withFilter(filters)
                                             
.withOutcomes("amqp:accepted:list", "amqp:rejected:list")
                                             .also()
                                .withTarget().withAddress("test-queue")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to