This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new 08237ca2 PROTON-2914 Improve visibility into the Delivery State
outcomes
08237ca2 is described below
commit 08237ca2abe4ad2974cc7b970d3adf3133d76ee3
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Jan 8 12:50:17 2026 -0500
PROTON-2914 Improve visibility into the Delivery State outcomes
Provide some marker interfaces for Accepted, Rejected, Released, Modified
and Transactional
in order to allow for simpler access to information in outcomes like
Modified and Rejected
where the data in those outcomes can be useful for determining the reason
for those.
---
.../org/apache/qpid/protonj2/client/Accepted.java | 33 ++++
.../apache/qpid/protonj2/client/DeliveryState.java | 33 ++++
.../org/apache/qpid/protonj2/client/Modified.java | 51 ++++++
.../org/apache/qpid/protonj2/client/Rejected.java | 50 ++++++
.../org/apache/qpid/protonj2/client/Released.java | 33 ++++
.../apache/qpid/protonj2/client/Transactional.java | 46 ++++++
.../protonj2/client/impl/ClientDeliveryState.java | 120 ++++++++++----
.../client/impl/ClientDeliveryStateTest.java | 180 +++++++++++++++++++++
.../qpid/protonj2/client/impl/ReceiverTest.java | 50 ++++++
.../qpid/protonj2/client/impl/SenderTest.java | 53 ++++++
.../qpid/protonj2/client/impl/SessionTest.java | 1 -
.../protonj2/client/impl/TransactionsTest.java | 20 ++-
.../protonj2/types/transport/ErrorCondition.java | 4 +
.../buffer/impl/ProtonCompositeBufferImplTest.java | 8 -
.../transport/ErrorConditionTypeCodecTest.java | 6 +-
.../types/transport/ErrorConditionTest.java | 6 +-
16 files changed, 649 insertions(+), 45 deletions(-)
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Accepted.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Accepted.java
new file mode 100644
index 00000000..4dfa8d06
--- /dev/null
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Accepted.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.protonj2.client;
+
+/**
+ * Accepted delivery state and outcome marker interface
+ */
+public interface Accepted extends DeliveryState {
+
+ @Override
+ default Type getType() {
+ return DeliveryState.Type.ACCEPTED;
+ }
+
+ @Override
+ default boolean isAccepted() {
+ return true;
+ }
+}
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/DeliveryState.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/DeliveryState.java
index c7fe2766..f78afb8d 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/DeliveryState.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/DeliveryState.java
@@ -25,6 +25,11 @@ import
org.apache.qpid.protonj2.client.impl.ClientDeliveryState.ClientReleased;
/**
* Conveys the outcome of a Delivery either incoming or outgoing.
+ * <p>
+ * {@link DeliveryState} instance have a type (e.g. Accepted, Rejected ...)
and can in
+ * case of a transactional delivery state, carry an outcome and in those cases
the is
+ * outcome APIs return the value of the nested outcome carried in the
transactional
+ * state.
*/
public interface DeliveryState {
@@ -48,6 +53,34 @@ public interface DeliveryState {
return getType() == Type.ACCEPTED;
}
+ /**
+ * @return true if the {@link DeliveryState} represents an Rejected
outcome.
+ */
+ default boolean isRejected() {
+ return getType() == Type.REJECTED;
+ }
+
+ /**
+ * @return true if the {@link DeliveryState} represents an Modified
outcome.
+ */
+ default boolean isModified() {
+ return getType() == Type.MODIFIED;
+ }
+
+ /**
+ * @return true if the {@link DeliveryState} represents an Released
outcome.
+ */
+ default boolean isReleased() {
+ return getType() == Type.RELEASED;
+ }
+
+ /**
+ * @return true if the {@link DeliveryState} represents an Transactional
outcome.
+ */
+ default boolean isTransactional() {
+ return getType() == Type.TRANSACTIONAL;
+ }
+
//----- Factory methods for default DeliveryState types
/**
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Modified.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Modified.java
new file mode 100644
index 00000000..9f1fd48c
--- /dev/null
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Modified.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.protonj2.client;
+
+import java.util.Map;
+
+/**
+ * Modified delivery state or outcome that carries details on the modification
+ * provided by the remote peer or sent by the local client.
+ */
+public interface Modified extends DeliveryState {
+
+ /**
+ * @return <code>true</code> if the disposition indicates the delivery
failed
+ */
+ boolean isDeliveryFailed();
+
+ /**
+ * @return <code>true</code> if the disposition indicates the delivery
should not be attempted again at the target
+ */
+ boolean isUndeliverableHere();
+
+ /**
+ * @return a Map containing the annotations applied by the peer that
created the disposition
+ */
+ Map<String, Object> getMessageAnnotations();
+
+ @Override
+ default Type getType() {
+ return DeliveryState.Type.MODIFIED;
+ }
+
+ @Override
+ default boolean isModified() {
+ return true;
+ }
+}
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Rejected.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Rejected.java
new file mode 100644
index 00000000..e4141fe7
--- /dev/null
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Rejected.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.protonj2.client;
+
+import java.util.Map;
+
+/**
+ * Rejected delivery state and outcome marker interface
+ */
+public interface Rejected extends DeliveryState {
+
+ /**
+ * @return the symbolic value that indicates the error condition
+ */
+ String getCondition();
+
+ /**
+ * @return the error description that provides supplementary details
+ */
+ String getDescription();
+
+ /**
+ * @return a Map carrying additional information about the error condition
+ */
+ Map<String, Object> getInfo();
+
+ @Override
+ default Type getType() {
+ return DeliveryState.Type.REJECTED;
+ }
+
+ @Override
+ default boolean isRejected() {
+ return true;
+ }
+}
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Released.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Released.java
new file mode 100644
index 00000000..01339f3b
--- /dev/null
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Released.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.protonj2.client;
+
+/**
+ * Released delivery state and outcome marker interface
+ */
+public interface Released extends DeliveryState {
+
+ @Override
+ default Type getType() {
+ return DeliveryState.Type.RELEASED;
+ }
+
+ @Override
+ default boolean isReleased() {
+ return true;
+ }
+}
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Transactional.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Transactional.java
new file mode 100644
index 00000000..bd5f3e54
--- /dev/null
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Transactional.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.protonj2.client;
+
+/**
+ * Transactional {@link DeliveryState} marker interface used when a {@link
Delivery}
+ * or {@link Tracker} is enlisted in a transaction. The transactional delivery
state
+ * carries an outcome that indicates the eventual outcome applied to the
associated
+ * AMQP delivery once the transaction is successfully discharged.
+ * <p>
+ * The API of the transactional delivery-state outcomes will return
<code>true</code>
+ * to indicate the type of outcome {@link Accepted}, {@link Rejected}, {@link
Released}
+ * or {@link Modified}. To inspect the actual assigned outcomes the {@link
#getOutcome()}
+ * method should be called and the resulting {@link DeliveryState} instance
inspected.
+ */
+public interface Transactional extends DeliveryState {
+
+ /**
+ * @return the outcome that will be applied to the delivery once the
transaction is successfully discharged
+ */
+ DeliveryState getOutcome();
+
+ @Override
+ default Type getType() {
+ return DeliveryState.Type.TRANSACTIONAL;
+ }
+
+ @Override
+ default boolean isTransactional() {
+ return true;
+ }
+}
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientDeliveryState.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientDeliveryState.java
index 2d7f659a..afdf5155 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientDeliveryState.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientDeliveryState.java
@@ -20,6 +20,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.qpid.protonj2.client.DeliveryState;
+import org.apache.qpid.protonj2.client.Transactional;
import org.apache.qpid.protonj2.types.Symbol;
import org.apache.qpid.protonj2.types.messaging.Accepted;
import org.apache.qpid.protonj2.types.messaging.Modified;
@@ -108,9 +109,9 @@ public abstract class ClientDeliveryState implements
DeliveryState {
case RELEASED:
return Released.getInstance();
case REJECTED:
- return new Rejected(); // TODO - How do we aggregate the
different values into one DeliveryState Object
+ return ClientRejected.fromUnknownClientType(state);
case MODIFIED:
- return new Modified(); // TODO - How do we aggregate the
different values into one DeliveryState Object
+ return ClientModified.fromUnknownClientType(state);
case TRANSACTIONAL:
throw new IllegalArgumentException("Cannot manually enlist
delivery in AMQP Transactions");
default:
@@ -124,7 +125,7 @@ public abstract class ClientDeliveryState implements
DeliveryState {
/**
* Client defined {@link Accepted} delivery state definition
*/
- public static class ClientAccepted extends ClientDeliveryState {
+ public static class ClientAccepted extends ClientDeliveryState implements
org.apache.qpid.protonj2.client.Accepted {
private static final ClientAccepted INSTANCE = new ClientAccepted();
@@ -132,11 +133,6 @@ public abstract class ClientDeliveryState implements
DeliveryState {
// Singleton
}
- @Override
- public Type getType() {
- return Type.ACCEPTED;
- }
-
@Override
org.apache.qpid.protonj2.types.transport.DeliveryState
getProtonDeliveryState() {
return Accepted.getInstance();
@@ -153,7 +149,7 @@ public abstract class ClientDeliveryState implements
DeliveryState {
/**
* Client defined {@link Released} delivery state definition
*/
- public static class ClientReleased extends ClientDeliveryState {
+ public static class ClientReleased extends ClientDeliveryState implements
org.apache.qpid.protonj2.client.Released {
private static final ClientReleased INSTANCE = new ClientReleased();
@@ -161,11 +157,6 @@ public abstract class ClientDeliveryState implements
DeliveryState {
// Singleton
}
- @Override
- public Type getType() {
- return Type.RELEASED;
- }
-
@Override
org.apache.qpid.protonj2.types.transport.DeliveryState
getProtonDeliveryState() {
return Released.getInstance();
@@ -182,7 +173,7 @@ public abstract class ClientDeliveryState implements
DeliveryState {
/**
* Client defined {@link Rejected} delivery state definition
*/
- public static class ClientRejected extends ClientDeliveryState {
+ public static class ClientRejected extends ClientDeliveryState implements
org.apache.qpid.protonj2.client.Rejected {
private final Rejected rejected = new Rejected();
@@ -224,24 +215,58 @@ public abstract class ClientDeliveryState implements
DeliveryState {
}
@Override
- public Type getType() {
- return Type.REJECTED;
+ org.apache.qpid.protonj2.types.transport.DeliveryState
getProtonDeliveryState() {
+ return rejected;
}
@Override
- org.apache.qpid.protonj2.types.transport.DeliveryState
getProtonDeliveryState() {
- return rejected;
+ public String getCondition() {
+ if (rejected != null && rejected.getError() != null &&
rejected.getError().getCondition() != null) {
+ return rejected.getError().getCondition().toString();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public String getDescription() {
+ if (rejected != null && rejected.getError() != null) {
+ return rejected.getError().getDescription();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Map<String, Object> getInfo() {
+ if (rejected != null && rejected.getError() != null) {
+ return
ClientConversionSupport.toStringKeyedMap(rejected.getError().getInfo());
+ } else {
+ return null;
+ }
}
static ClientRejected fromProtonType(Rejected rejected) {
return new ClientRejected(rejected);
}
+
+ static Rejected fromUnknownClientType(DeliveryState deliveryState) {
+ if (deliveryState instanceof
org.apache.qpid.protonj2.client.Rejected) {
+ org.apache.qpid.protonj2.client.Rejected rejectedState =
(org.apache.qpid.protonj2.client.Rejected) deliveryState;
+
+ return new Rejected(new
ErrorCondition(rejectedState.getCondition(),
+
rejectedState.getDescription(),
+
ClientConversionSupport.toSymbolKeyedMap(rejectedState.getInfo())));
+ } else {
+ return new Rejected(); // TODO: This loses data from the
source but remains backwards compatible
+ }
+ }
}
/**
* Client defined {@link Modified} delivery state definition
*/
- public static class ClientModified extends ClientDeliveryState {
+ public static class ClientModified extends ClientDeliveryState implements
org.apache.qpid.protonj2.client.Modified {
private final Modified modified = new Modified();
@@ -283,24 +308,46 @@ public abstract class ClientDeliveryState implements
DeliveryState {
}
@Override
- public Type getType() {
- return Type.MODIFIED;
+ org.apache.qpid.protonj2.types.transport.DeliveryState
getProtonDeliveryState() {
+ return modified;
}
@Override
- org.apache.qpid.protonj2.types.transport.DeliveryState
getProtonDeliveryState() {
- return modified;
+ public boolean isDeliveryFailed() {
+ return modified.isDeliveryFailed();
+ }
+
+ @Override
+ public boolean isUndeliverableHere() {
+ return modified.isUndeliverableHere();
+ }
+
+ @Override
+ public Map<String, Object> getMessageAnnotations() {
+ return
ClientConversionSupport.toStringKeyedMap(modified.getMessageAnnotations());
}
static ClientModified fromProtonType(Modified modified) {
return new ClientModified(modified);
}
+
+ static Modified fromUnknownClientType(DeliveryState deliveryState) {
+ if (deliveryState instanceof
org.apache.qpid.protonj2.client.Modified) {
+ org.apache.qpid.protonj2.client.Modified modifiedState =
(org.apache.qpid.protonj2.client.Modified) deliveryState;
+
+ return new Modified(modifiedState.isDeliveryFailed(),
modifiedState.isUndeliverableHere(),
+
ClientConversionSupport.toSymbolKeyedMap(modifiedState.getMessageAnnotations()));
+ } else {
+ return new Modified(); // TODO: This loses data from the
source but remains backwards compatible
+ }
+ }
+
}
/**
* Client defined {@link TransactionalState} delivery state definition
*/
- public static class ClientTransactional extends ClientDeliveryState {
+ public static class ClientTransactional extends ClientDeliveryState
implements Transactional {
private final TransactionalState txnState = new TransactionalState();
@@ -310,13 +357,28 @@ public abstract class ClientDeliveryState implements
DeliveryState {
}
@Override
- public Type getType() {
- return Type.TRANSACTIONAL;
+ public boolean isAccepted() {
+ return txnState.getOutcome() instanceof Accepted;
}
@Override
- public boolean isAccepted() {
- return txnState.getOutcome() instanceof Accepted;
+ public boolean isRejected() {
+ return txnState.getOutcome() instanceof Rejected;
+ }
+
+ @Override
+ public boolean isReleased() {
+ return txnState.getOutcome() instanceof Released;
+ }
+
+ @Override
+ public boolean isModified() {
+ return txnState.getOutcome() instanceof Modified;
+ }
+
+ @Override
+ public DeliveryState getOutcome() {
+ return fromProtonType(txnState.getOutcome());
}
@Override
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ClientDeliveryStateTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ClientDeliveryStateTest.java
new file mode 100644
index 00000000..44a2c3db
--- /dev/null
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ClientDeliveryStateTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.protonj2.client.impl;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Map;
+
+import org.apache.qpid.protonj2.client.DeliveryState;
+import org.apache.qpid.protonj2.client.Transactional;
+import
org.apache.qpid.protonj2.client.impl.ClientDeliveryState.ClientTransactional;
+import org.apache.qpid.protonj2.types.Binary;
+import org.apache.qpid.protonj2.types.Symbol;
+import org.apache.qpid.protonj2.types.messaging.Accepted;
+import org.apache.qpid.protonj2.types.messaging.Modified;
+import org.apache.qpid.protonj2.types.messaging.Outcome;
+import org.apache.qpid.protonj2.types.messaging.Rejected;
+import org.apache.qpid.protonj2.types.messaging.Released;
+import org.apache.qpid.protonj2.types.transactions.TransactionalState;
+import org.apache.qpid.protonj2.types.transport.ErrorCondition;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+@Timeout(20)
+public class ClientDeliveryStateTest {
+
+ @Test
+ public void testAccepted() {
+ final DeliveryState state = DeliveryState.accepted();
+
+ assertNotNull(state);
+ assertEquals(state.getType(), DeliveryState.Type.ACCEPTED);
+ assertTrue(state.isAccepted());
+ assertFalse(state.isRejected());
+ assertFalse(state.isReleased());
+ assertFalse(state.isModified());
+ assertFalse(state.isTransactional());
+ }
+
+ @Test
+ public void testRejected() {
+ final DeliveryState state = DeliveryState.rejected("Error", "Problem");
+
+ assertNotNull(state);
+ assertEquals(state.getType(), DeliveryState.Type.REJECTED);
+ assertFalse(state.isAccepted());
+ assertTrue(state.isRejected());
+ assertFalse(state.isReleased());
+ assertFalse(state.isModified());
+ assertFalse(state.isTransactional());
+ }
+
+ @Test
+ public void testReleased() {
+ final DeliveryState state = DeliveryState.released();
+
+ assertNotNull(state);
+ assertEquals(state.getType(), DeliveryState.Type.RELEASED);
+ assertFalse(state.isAccepted());
+ assertFalse(state.isRejected());
+ assertTrue(state.isReleased());
+ assertFalse(state.isModified());
+ assertFalse(state.isTransactional());
+ }
+
+ @Test
+ public void testModified() {
+ final DeliveryState state = DeliveryState.modified(true, false);
+
+ assertNotNull(state);
+ assertEquals(state.getType(), DeliveryState.Type.MODIFIED);
+ assertFalse(state.isAccepted());
+ assertFalse(state.isRejected());
+ assertFalse(state.isReleased());
+ assertTrue(state.isModified());
+ assertFalse(state.isTransactional());
+ }
+
+ @Test
+ public void testTransactionalWithAccepted() {
+ final Transactional state = new
ClientTransactional(createTransactional(Accepted.getInstance()));
+
+ assertNotNull(state);
+ assertEquals(state.getType(), DeliveryState.Type.TRANSACTIONAL);
+ assertTrue(state.isAccepted());
+ assertFalse(state.isRejected());
+ assertFalse(state.isReleased());
+ assertFalse(state.isModified());
+ assertTrue(state.isTransactional());
+ assertTrue(state.getOutcome() instanceof
org.apache.qpid.protonj2.client.Accepted);
+ }
+
+ @Test
+ public void testTransactionalWithReleased() {
+ final Transactional state = new
ClientTransactional(createTransactional(Released.getInstance()));
+
+ assertNotNull(state);
+ assertEquals(state.getType(), DeliveryState.Type.TRANSACTIONAL);
+ assertFalse(state.isAccepted());
+ assertFalse(state.isRejected());
+ assertTrue(state.isReleased());
+ assertFalse(state.isModified());
+ assertTrue(state.isTransactional());
+ assertTrue(state.getOutcome() instanceof
org.apache.qpid.protonj2.client.Released);
+ }
+
+ @Test
+ public void testTransactionalWithModified() {
+ final Map<Symbol, Object> symbolicAnnotations =
Map.of(Symbol.valueOf("test"), "test");
+ final Map<String, Object> annotations = Map.of("test", "test");
+
+ final Transactional state = new
ClientTransactional(createTransactional(new Modified(true, true,
symbolicAnnotations)));
+
+ assertNotNull(state);
+ assertEquals(state.getType(), DeliveryState.Type.TRANSACTIONAL);
+ assertFalse(state.isAccepted());
+ assertFalse(state.isRejected());
+ assertFalse(state.isReleased());
+ assertTrue(state.isModified());
+ assertTrue(state.isTransactional());
+ assertTrue(state.getOutcome() instanceof
org.apache.qpid.protonj2.client.Modified);
+
+ org.apache.qpid.protonj2.client.Modified modifiedState =
(org.apache.qpid.protonj2.client.Modified) state.getOutcome();
+
+ assertNotNull(modifiedState);
+ assertTrue(modifiedState.isDeliveryFailed());
+ assertTrue(modifiedState.isUndeliverableHere());
+ assertEquals(annotations, modifiedState.getMessageAnnotations());
+ }
+
+ @Test
+ public void testTransactionalWithRejected() {
+ final Map<Symbol, Object> symvolicInfo =
Map.of(Symbol.valueOf("test"), "test");
+ final Map<String, Object> info = Map.of("test", "test");
+
+ final Transactional state = new
ClientTransactional(createTransactional(new Rejected(new ErrorCondition("test",
"data", symvolicInfo))));
+
+ assertNotNull(state);
+ assertEquals(state.getType(), DeliveryState.Type.TRANSACTIONAL);
+ assertFalse(state.isAccepted());
+ assertTrue(state.isRejected());
+ assertFalse(state.isReleased());
+ assertFalse(state.isModified());
+ assertTrue(state.isTransactional());
+ assertTrue(state.getOutcome() instanceof
org.apache.qpid.protonj2.client.Rejected);
+
+ org.apache.qpid.protonj2.client.Rejected rejectedState =
(org.apache.qpid.protonj2.client.Rejected) state.getOutcome();
+
+ assertNotNull(rejectedState);
+ assertEquals("test", rejectedState.getCondition());
+ assertEquals("data", rejectedState.getDescription());
+ assertEquals(info, rejectedState.getInfo());
+ }
+
+ private TransactionalState createTransactional(Outcome outcome) {
+ final TransactionalState txnState = new TransactionalState();
+
+ txnState.setTxnId(new Binary(new byte[] { 0, 1, 2, 3}));
+ txnState.setOutcome(outcome);
+
+ return txnState;
+ }
+}
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
index d64ef8df..6a20756e 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
@@ -3398,4 +3398,54 @@ public class ReceiverTest extends
ImperativeClientTestCase {
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
+
+ @Test
+ public void testReceiveMessageAndSendModifiedDispositionToRemote() throws
Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow();
+ peer.remoteTransfer().withHandle(0)
+ .withDeliveryId(0)
+ .withDeliveryTag(new byte[] { 1 })
+ .withMore(false)
+ .withMessageFormat(0)
+ .withMessage().withBody().withData((byte[])
null)
+ .also()
+ .queue();
+ peer.expectDisposition().withFirst(0)
+ .withSettled(false)
+ .withState().modified(false, false,
Map.of("annotation", "value"));
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ final Client container = Client.create();
+ final ConnectionOptions options = new ConnectionOptions();
+ final ReceiverOptions receiverOptions = new
ReceiverOptions().autoSettle(false).autoAccept(false);
+ final Connection connection =
container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+ final Receiver receiver = connection.openReceiver("test-queue",
receiverOptions);
+ final Delivery delivery = receiver.receive();
+ final Message<byte[]> message = delivery.message();
+
+ assertNull(message.body());
+
+ delivery.disposition(DeliveryState.modified(false, false,
Map.of("annotation", "value")), false);
+
+ peer.waitForScriptToComplete();
+ peer.expectDetach().respond();
+ peer.expectClose().respond();
+
+ assertNotNull(delivery);
+
+ receiver.close();
+ connection.close();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
}
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java
index a7fefc4a..f54e2054 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java
@@ -2920,4 +2920,57 @@ public class SenderTest extends ImperativeClientTestCase
{
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
+
+ @Test
+ public void testSendIsRejectedAndTrackerReflectsState() throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender().respond();
+ peer.remoteFlow().withLinkCredit(1).queue();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Sender test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ Connection connection = container.connect(remoteURI.getHost(),
remoteURI.getPort()).openFuture().get();
+
+ Session session = connection.openSession().openFuture().get();
+ SenderOptions options = new
SenderOptions().deliveryMode(DeliveryMode.AT_LEAST_ONCE).autoSettle(false);
+ Sender sender = session.openSender("test-tags",
options).openFuture().get();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectTransfer().withNonNullPayload()
+ .withDeliveryTag(new byte[] {0})
+ .respond()
+ .withSettled(true)
+ .withState().rejected("no-space", "disk
full", Map.of("test", "test"));
+ peer.expectDetach().respond();
+ peer.expectClose().respond();
+
+ final Message<String> message = Message.create("Hello World");
+ final Tracker tracker = sender.send(message);
+
+ assertNotNull(tracker);
+ assertNotNull(tracker.settlementFuture().get().settled());
+ assertEquals(DeliveryState.Type.REJECTED,
tracker.remoteState().getType());
+
+ final org.apache.qpid.protonj2.client.Rejected rejected =
+ (org.apache.qpid.protonj2.client.Rejected)
tracker.remoteState();
+
+ assertNotNull(rejected);
+ assertEquals("no-space", rejected.getCondition());
+ assertEquals("disk full", rejected.getDescription());
+ assertEquals(Map.of("test", "test"), rejected.getInfo());
+
+ sender.closeAsync().get(10, TimeUnit.SECONDS);
+
+ connection.closeAsync().get(10, TimeUnit.SECONDS);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
}
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SessionTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SessionTest.java
index 148eb9d0..0d4cab58 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SessionTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SessionTest.java
@@ -561,7 +561,6 @@ public class SessionTest extends ImperativeClientTestCase {
}
}
- @SuppressWarnings("resource")
@Test
public void testCannotCreateResourcesFromClosedSession() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java
index 1a640daa..dbd4f9d9 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java
@@ -42,6 +42,7 @@ import org.apache.qpid.protonj2.client.Session;
import org.apache.qpid.protonj2.client.StreamSender;
import org.apache.qpid.protonj2.client.StreamSenderMessage;
import org.apache.qpid.protonj2.client.Tracker;
+import org.apache.qpid.protonj2.client.Transactional;
import
org.apache.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosedException;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
@@ -63,10 +64,11 @@ import org.apache.qpid.protonj2.types.messaging.Released;
import org.apache.qpid.protonj2.types.transactions.TransactionErrors;
import org.apache.qpid.protonj2.types.transport.AmqpError;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-//@Timeout(30)
+@Timeout(30)
public class TransactionsTest extends ImperativeClientTestCase {
private static final Logger LOG =
LoggerFactory.getLogger(TransactionsTest.class);
@@ -970,6 +972,8 @@ public class TransactionsTest extends
ImperativeClientTestCase {
assertNotNull(tracker);
assertNotNull(tracker.awaitAccepted());
assertTrue(tracker.remoteState().isAccepted());
+ assertTrue(tracker.remoteState().isTransactional());
+ assertTrue(tracker.state().isTransactional());
assertEquals(tracker.remoteState().getType(),
DeliveryState.Type.TRANSACTIONAL,
"Delivery inside transaction should have
Transactional state");
assertEquals(tracker.state().getType(),
DeliveryState.Type.TRANSACTIONAL,
@@ -1034,6 +1038,7 @@ public class TransactionsTest extends
ImperativeClientTestCase {
assertNotNull(tracker.settlementFuture().get());
assertEquals(tracker.remoteState().getType(),
DeliveryState.Type.TRANSACTIONAL);
assertNotNull(tracker.state());
+ assertTrue(tracker.remoteState().isTransactional());
assertEquals(tracker.state().getType(),
DeliveryState.Type.TRANSACTIONAL,
"Delivery inside transaction should have Transactional
state: " + tracker.state().getType());
Wait.assertTrue("Delivery in transaction should be locally
settled after response", () -> tracker.settled());
@@ -1277,6 +1282,7 @@ public class TransactionsTest extends
ImperativeClientTestCase {
final Tracker tracker =
sender.send(Message.create("test-message"));
assertNotNull(tracker.settlementFuture().get());
assertEquals(tracker.remoteState().getType(),
DeliveryState.Type.TRANSACTIONAL);
+ assertTrue(tracker.remoteState().isTransactional());
try {
session.commitTransaction();
@@ -1597,6 +1603,12 @@ public class TransactionsTest extends
ImperativeClientTestCase {
delivery2.release();
session.commitTransaction();
+
+ assertFalse(delivery1.state().isReleased());
+ assertTrue(delivery1.state().isAccepted());
+ assertTrue(delivery2.state().isReleased());
+ assertFalse(delivery2.state().isAccepted());
+
receiver.closeAsync();
connection.closeAsync().get();
@@ -1659,6 +1671,12 @@ public class TransactionsTest extends
ImperativeClientTestCase {
receiver.closeAsync();
connection.closeAsync().get();
+ assertTrue(delivery.state() instanceof
org.apache.qpid.protonj2.client.Transactional);
+
+ final Transactional txn = (Transactional) delivery.state();
+
+ assertTrue(txn.getOutcome() instanceof
org.apache.qpid.protonj2.client.Modified);
+
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
diff --git
a/protonj2/src/main/java/org/apache/qpid/protonj2/types/transport/ErrorCondition.java
b/protonj2/src/main/java/org/apache/qpid/protonj2/types/transport/ErrorCondition.java
index acdce6c5..180d4e4e 100644
---
a/protonj2/src/main/java/org/apache/qpid/protonj2/types/transport/ErrorCondition.java
+++
b/protonj2/src/main/java/org/apache/qpid/protonj2/types/transport/ErrorCondition.java
@@ -39,6 +39,10 @@ public final class ErrorCondition {
this(condition, description, null);
}
+ public ErrorCondition(String condition, String description, Map<Symbol,
Object> info) {
+ this(Symbol.valueOf(condition), description, info);
+ }
+
public ErrorCondition(Symbol condition, String description, Map<Symbol,
Object> info) {
this.condition = condition;
this.description = description;
diff --git
a/protonj2/src/test/java/org/apache/qpid/protonj2/buffer/impl/ProtonCompositeBufferImplTest.java
b/protonj2/src/test/java/org/apache/qpid/protonj2/buffer/impl/ProtonCompositeBufferImplTest.java
index 024226fe..e91fc45f 100644
---
a/protonj2/src/test/java/org/apache/qpid/protonj2/buffer/impl/ProtonCompositeBufferImplTest.java
+++
b/protonj2/src/test/java/org/apache/qpid/protonj2/buffer/impl/ProtonCompositeBufferImplTest.java
@@ -475,7 +475,6 @@ public class ProtonCompositeBufferImplTest extends
ProtonAbstractBufferTest {
}
}
- @SuppressWarnings("resource")
@Test
public void testCompositeBuffersCannotHaveDuplicateComponents() {
try (ProtonBufferAllocator allocator = createProtonDefaultAllocator())
{
@@ -545,7 +544,6 @@ public class ProtonCompositeBufferImplTest extends
ProtonAbstractBufferTest {
}
}
- @SuppressWarnings("resource")
@Test
@Disabled
public void testCompositeBufferMustNotBeAllowedToContainThemselves() {
@@ -597,7 +595,6 @@ public class ProtonCompositeBufferImplTest extends
ProtonAbstractBufferTest {
}
}
- @SuppressWarnings("resource")
@Test
public void testAppendingCompositeBufferToItselfMustThrow() {
try (ProtonBufferAllocator allocator = createProtonDefaultAllocator())
{
@@ -912,7 +909,6 @@ public class ProtonCompositeBufferImplTest extends
ProtonAbstractBufferTest {
}
}
- @SuppressWarnings("resource")
@Test
public void testComposingReadOnlyAndWritableBuffersMustThrow() {
try (ProtonBufferAllocator allocator = createProtonDefaultAllocator())
{
@@ -941,7 +937,6 @@ public class ProtonCompositeBufferImplTest extends
ProtonAbstractBufferTest {
}
}
- @SuppressWarnings("resource")
@Test
public void
testCompositeWritableBufferCannotBeExtendedWithReadOnlyBuffer() {
try (ProtonBufferAllocator allocator = createProtonDefaultAllocator();
@@ -953,7 +948,6 @@ public class ProtonCompositeBufferImplTest extends
ProtonAbstractBufferTest {
}
}
- @SuppressWarnings("resource")
@Test
public void
testCompositeReadOnlyBufferCannotBeExtendedWithWritableBuffer() {
try (ProtonBufferAllocator allocator = createProtonDefaultAllocator();
@@ -1599,7 +1593,6 @@ public class ProtonCompositeBufferImplTest extends
ProtonAbstractBufferTest {
}
}
- @SuppressWarnings("resource")
@Test
public void testSplitComponentsFloorMustThrowOnOutOfBounds() {
try (ProtonBufferAllocator allocator = createProtonDefaultAllocator();
@@ -1676,7 +1669,6 @@ public class ProtonCompositeBufferImplTest extends
ProtonAbstractBufferTest {
}
}
- @SuppressWarnings("resource")
@Test
public void testSplitComponentsCeilMustThrowOnOutOfBounds() {
try (ProtonBufferAllocator allocator = createProtonDefaultAllocator();
diff --git
a/protonj2/src/test/java/org/apache/qpid/protonj2/codec/transport/ErrorConditionTypeCodecTest.java
b/protonj2/src/test/java/org/apache/qpid/protonj2/codec/transport/ErrorConditionTypeCodecTest.java
index 0240b865..3cb030a4 100644
---
a/protonj2/src/test/java/org/apache/qpid/protonj2/codec/transport/ErrorConditionTypeCodecTest.java
+++
b/protonj2/src/test/java/org/apache/qpid/protonj2/codec/transport/ErrorConditionTypeCodecTest.java
@@ -165,14 +165,14 @@ public class ErrorConditionTypeCodecTest extends
CodecTestSupport {
@Test
public void testEqualityOfNewlyConstructed() {
- ErrorCondition new1 = new ErrorCondition(null, null, null);
- ErrorCondition new2 = new ErrorCondition(null, null, null);
+ ErrorCondition new1 = new ErrorCondition((String) null, null, null);
+ ErrorCondition new2 = new ErrorCondition((String) null, null, null);
assertErrorConditionsEqual(new1, new2);
}
@Test
public void testSameObject() {
- ErrorCondition error = new ErrorCondition(null, null, null);
+ ErrorCondition error = new ErrorCondition((String) null, null, null);
assertErrorConditionsEqual(error, error);
}
diff --git
a/protonj2/src/test/java/org/apache/qpid/protonj2/types/transport/ErrorConditionTest.java
b/protonj2/src/test/java/org/apache/qpid/protonj2/types/transport/ErrorConditionTest.java
index eb3fc7f4..b4843a60 100644
---
a/protonj2/src/test/java/org/apache/qpid/protonj2/types/transport/ErrorConditionTest.java
+++
b/protonj2/src/test/java/org/apache/qpid/protonj2/types/transport/ErrorConditionTest.java
@@ -51,11 +51,11 @@ public class ErrorConditionTest {
assertEquals(original, copy);
Map<Symbol, Object> infoMap = new HashMap<>();
- ErrorCondition other1 = new ErrorCondition(null, "error", infoMap);
+ ErrorCondition other1 = new ErrorCondition((String) null, "error",
infoMap);
ErrorCondition other2 = new ErrorCondition(AmqpError.DECODE_ERROR,
null, infoMap);
ErrorCondition other3 = new ErrorCondition(AmqpError.DECODE_ERROR,
"error", infoMap);
- ErrorCondition other4 = new ErrorCondition(null, null, infoMap);
- ErrorCondition other5 = new ErrorCondition(null, null, null);
+ ErrorCondition other4 = new ErrorCondition((String) null, null,
infoMap);
+ ErrorCondition other5 = new ErrorCondition((Symbol) null, null, null);
assertNotEquals(original, other1);
assertNotEquals(original, other2);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]