[
https://issues.apache.org/jira/browse/ARTEMIS-4771?focusedWorklogId=920697&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-920697
]
ASF GitHub Bot logged work on ARTEMIS-4771:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 23/May/24 15:26
Start Date: 23/May/24 15:26
Worklog Time Spent: 10m
Work Description: gemmellr commented on code in PR #4942:
URL: https://github.com/apache/activemq-artemis/pull/4942#discussion_r1611907602
##########
artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriterTest.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.proton;
+
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import org.apache.activemq.artemis.core.message.LargeBodyReader;
+import
org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import
org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.engine.impl.TransportImpl;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.Spy;
+
+/**
+ * Tests for the AMQP Large Message Writer
+ */
+public class AMQPLargeMessageWriterTest {
+
+ @Mock
+ ProtonServerSenderContext serverSender;
+
+ @Mock
+ Sender protonSender;
+
+ @Mock
+ Session protonSession;
+
+ @Mock
+ Connection protonConnection;
+
+ @Mock
+ TransportImpl protonTransport;
+
+ @Mock
+ Delivery protonDelivery;
+
+ @Mock
+ MessageReference reference;
+
+ @Mock
+ AMQPLargeMessage message;
+
+ @Mock
+ LargeBodyReader bodyReader;
+
+ @Mock
+ AMQPConnectionContext connectionContext;
+
+ @Mock
+ AMQPSessionContext sessionContext;
+
+ @Mock
+ AMQPSessionCallback sessionSPI;
+
+ @Mock
+ org.apache.activemq.artemis.spi.core.remoting.Connection
transportConnection;
+
+ @Mock
+ ActiveMQProtonRemotingConnection remotingConnection;
+
+ @Spy
+ NullStorageManager nullStoreManager = new NullStorageManager();
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.openMocks(this);
+
+ when(serverSender.getSessionContext()).thenReturn(sessionContext);
+ when(serverSender.getSender()).thenReturn(protonSender);
+ when(serverSender.createDelivery(any(),
anyInt())).thenReturn(protonDelivery);
+
+ when(protonSender.getSession()).thenReturn(protonSession);
+ when(protonSession.getConnection()).thenReturn(protonConnection);
+ when(protonConnection.getTransport()).thenReturn(protonTransport);
+ when(protonTransport.getOutboundFrameSizeLimit()).thenReturn(65535);
+
+
when(transportConnection.getProtocolConnection()).thenReturn(remotingConnection);
+ when(reference.getMessage()).thenReturn(message);
+ when(message.isLargeMessage()).thenReturn(true);
+ when(message.getLargeBodyReader()).thenReturn(bodyReader);
+ when(sessionContext.getSessionSPI()).thenReturn(sessionSPI);
+
when(sessionContext.getAMQPConnectionContext()).thenReturn(connectionContext);
+ when(connectionContext.flowControl(any())).thenReturn(true);
+ when(sessionSPI.getStorageManager()).thenReturn(nullStoreManager);
+
when(sessionSPI.getTransportConnection()).thenReturn(transportConnection);
+ }
+
+ @Test
+ public void testWriterThrowsIllegalStateIfNotOpenedWhenWriteCalled() throws
Exception {
+ AMQPLargeMessageWriter writer = new AMQPLargeMessageWriter(serverSender);
+
+ try {
+ writer.writeBytes(reference);
+ fail("Should throw as the writer was not opened.");
+ } catch (IllegalStateException e) {
+ // Expected
+ }
+ }
+
+ @Test
+ public void testNoWritesWhenProtonSenderIsLocallyClosed() throws Exception {
+ AMQPLargeMessageWriter writer = new AMQPLargeMessageWriter(serverSender);
+
+ when(protonSender.getLocalState()).thenReturn(EndpointState.CLOSED);
+
+ writer.open(reference);
+
+ try {
+ writer.writeBytes(reference);
+ } catch (IllegalStateException e) {
+ fail("Should not throw as link was closed before write actioned.");
+ }
+
+ verify(reference).getMessage();
+ verify(message).usageUp();
+ verify(protonSender).getLocalState();
+
+ verifyNoMoreInteractions(reference);
+ verifyNoInteractions(protonDelivery);
+ }
+
+ @Test
+ public void testLargeMessageUsageLoweredOnCloseWhenWriteNotCompleted()
throws Exception {
+ AMQPLargeMessageWriter writer = new AMQPLargeMessageWriter(serverSender);
+
+ writer.open(reference);
+
+ when(protonSender.getLocalState()).thenReturn(EndpointState.ACTIVE);
+ when(protonDelivery.isPartial()).thenReturn(true);
+
+ // The writer will wait for flow control to resume it
+ when(connectionContext.flowControl(any())).thenReturn(false);
+
+ verify(message).usageUp();
+
+ try {
+ writer.writeBytes(reference);
+ } catch (IllegalStateException e) {
+ fail("Should not throw as the delivery is completed so no data should
be written.");
+ }
+
+ verify(message, Mockito.never()).usageDown();
+ verify(reference).getMessage();
+ verifyNoMoreInteractions(reference);
+
+ try {
+ writer.close();
+ } catch (IllegalStateException e) {
+ fail("Should not throw as the close when write wasn't completed.");
+ }
+
+ verify(message).usageDown();
+ verify(protonSender).getSession();
+ verify(protonDelivery).getTag();
+ verify(protonSender, atLeastOnce()).getLocalState();
+
+ verifyNoMoreInteractions(reference);
+ verifyNoMoreInteractions(protonDelivery);
+ }
+
+ @Test
+ public void testTryDeliveringRunAfterClosedDoesNotThrow() throws Exception {
+ AMQPLargeMessageWriter writer = new AMQPLargeMessageWriter(serverSender);
+
+ writer.open(reference);
+
+ when(protonSender.getLocalState()).thenReturn(EndpointState.ACTIVE);
+ when(protonDelivery.isPartial()).thenReturn(true);
+
+ // The writer will wait for flow control to resume it
+ when(connectionContext.flowControl(any())).then(invocation -> {
+ ReadyListener listener = invocation.getArgument(0);
+
+ writer.close();
+
+ try {
+ listener.readyForWriting();
+ } catch (Exception e) {
+ fail("Pending deliver should no-op if closed before completion.");
+ }
+
+ return false;
+ });
+
+ doAnswer(invocation -> {
+ Runnable runnable = invocation.getArgument(0);
+
+ try {
+ runnable.run();
+ } catch (Exception e) {
+ fail("Queued invocation of writer tasks should not throw.");
+ }
+
+ return null;
+ }).when(connectionContext).runLater(any(Runnable.class));
Review Comment:
Checkstyle failed here, missing 1 space.
Issue Time Tracking
-------------------
Worklog Id: (was: 920697)
Time Spent: 2h 50m (was: 2h 40m)
> NPE between AMQPLargeMessageWriter::tryDelivering and resetClose
> ----------------------------------------------------------------
>
> Key: ARTEMIS-4771
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4771
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Reporter: Clebert Suconic
> Priority: Major
> Time Spent: 2h 50m
> Remaining Estimate: 0h
>
> This is using RedHat's bits:
> java.lang.NullPointerException: Cannot invoke
> "org.apache.qpid.proton.engine.Delivery.getTag()" because "this.delivery" is
> null
> at
> org.apache.activemq.artemis.protocol.amqp.proton.AMQPLargeMessageWriter.tryDelivering(AMQPLargeMessageWriter.java:174)
> ~[artemis-amqp-protocol-2.33.0.redhat-00009.jar:2.33.0.redhat-00009]
> at
> io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
> ~[netty-common-4.1.108.Final-redhat-00001.jar:4.1.108.Final-redhat-00001]
> at
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
> [netty-common-4.1.108.Final-redhat-00001.jar:4.1.108.Final-redhat-00001]
> at
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
> [netty-common-4.1.108.Final-redhat-00001.jar:4.1.108.Final-redhat-00001]
> at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:413)
> [netty-transport-classes-epoll-4.1.108.Final-redhat-00001.jar:4.1.108.Final-redhat-00001]
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
> [netty-common-4.1.108.Final-redhat-00001.jar:4.1.108.Final-redhat-00001]
> at
> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> [netty-common-4.1.108.Final-redhat-00001.jar:4.1.108.Final-redhat-00001]
> at
> org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
> [artemis-commons-2.33.0.redhat-00009.jar:2.33.0.redhat-00009]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)