[ 
https://issues.apache.org/jira/browse/ARTEMIS-4544?focusedWorklogId=896347&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-896347
 ]

ASF GitHub Bot logged work on ARTEMIS-4544:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Dec/23 19:15
            Start Date: 19/Dec/23 19:15
    Worklog Time Spent: 10m 
      Work Description: tabish121 commented on code in PR #4721:
URL: https://github.com/apache/activemq-artemis/pull/4721#discussion_r1431828482


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java:
##########
@@ -182,6 +182,10 @@ public boolean isWritable(ReadyListener readyListener) {
       return connection.isWritable(readyListener);
    }
 
+   public boolean isLargeMessageSync() {
+      return server.getConfiguration().isLargeMessageSync();
+   }
+

Review Comment:
   Extra newline clutter



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java:
##########
@@ -239,7 +243,7 @@ public synchronized AMQPFederation 
addQueueMatchPolicy(FederationReceiveFromQueu
     * @throws ActiveMQException if an error occurs processing the added policy
     */
    public synchronized AMQPFederation 
addAddressMatchPolicy(FederationReceiveFromAddressPolicy addressPolicy) throws 
ActiveMQException {
-      final FederationAddressPolicyManager manager = new 
AMQPFederationAddressPolicyManager(this, addressPolicy);
+      final FederationAddressPolicyManager manager = new 
AMQPFederationAddressPolicyManager(this, addressPolicy, 
server.getConfiguration().isLargeMessageSync());

Review Comment:
   Adding unneeded constructor args makes the code more unreadable and less 
maintainable.  The bits that need to inspect this value can simply ask the 
connection or session SPI for the value when it is needed.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java:
##########
@@ -113,17 +113,20 @@ public class AMQPFederationAddressConsumer implements 
FederationConsumerInternal
    private AMQPFederatedAddressDeliveryReceiver receiver;
    private Receiver protonReceiver;
    private boolean started;
+   private final boolean largeMessageSync;

Review Comment:
   Not needed to track this, this can be queried and not pollute the entire 
code base with copies of the configuration value.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java:
##########
@@ -113,17 +113,20 @@ public class AMQPFederationAddressConsumer implements 
FederationConsumerInternal
    private AMQPFederatedAddressDeliveryReceiver receiver;
    private Receiver protonReceiver;
    private boolean started;
+   private final boolean largeMessageSync;
    private volatile boolean closed;
    private Consumer<FederationConsumerInternal> remoteCloseHandler;
 
    public AMQPFederationAddressConsumer(AMQPFederation federation, 
AMQPFederationConsumerConfiguration configuration,
-                                        AMQPSessionContext session, 
FederationConsumerInfo consumerInfo, FederationReceiveFromAddressPolicy policy) 
{
+                                        AMQPSessionContext session, 
FederationConsumerInfo consumerInfo, FederationReceiveFromAddressPolicy policy,
+                                        boolean largeMessageSync) {

Review Comment:
   Same as above, not needed as it can be queried elsewhere.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java:
##########
@@ -77,6 +77,10 @@ public abstract class AMQPFederation implements 
FederationInternal {
    protected boolean started;
    protected volatile boolean connected;
 
+   public boolean isLargeMessageSync() {

Review Comment:
   It isn't really necessary to define an accessor here as there are already 
accessors in the Session and Connection SPI bits.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java:
##########
@@ -46,15 +46,17 @@ public class AMQPFederationAddressPolicyManager extends 
FederationAddressPolicyM
 
    protected final AMQPFederation federation;
    protected final AMQPFederationConsumerConfiguration configuration;
+   private final boolean largeMessageSync;
 
    protected final String remoteQueueFilter;
 
-   public AMQPFederationAddressPolicyManager(AMQPFederation federation, 
FederationReceiveFromAddressPolicy addressPolicy) throws ActiveMQException {
+   public AMQPFederationAddressPolicyManager(AMQPFederation federation, 
FederationReceiveFromAddressPolicy addressPolicy, boolean largeMessageSync) 
throws ActiveMQException {
       super(federation, addressPolicy);
 
       this.federation = federation;
       this.remoteQueueFilter = generateAddressFilter(policy.getMaxHops());
       this.configuration = new AMQPFederationConsumerConfiguration(federation, 
policy.getProperties());
+      this.largeMessageSync = largeMessageSync;

Review Comment:
   Same as above, not needed as it can be queried elsewhere.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java:
##########
@@ -46,15 +46,17 @@ public class AMQPFederationAddressPolicyManager extends 
FederationAddressPolicyM
 
    protected final AMQPFederation federation;
    protected final AMQPFederationConsumerConfiguration configuration;
+   private final boolean largeMessageSync;
 
    protected final String remoteQueueFilter;
 
-   public AMQPFederationAddressPolicyManager(AMQPFederation federation, 
FederationReceiveFromAddressPolicy addressPolicy) throws ActiveMQException {
+   public AMQPFederationAddressPolicyManager(AMQPFederation federation, 
FederationReceiveFromAddressPolicy addressPolicy, boolean largeMessageSync) 
throws ActiveMQException {

Review Comment:
   Same as above, not needed as it can be queried elsewhere.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java:
##########
@@ -46,15 +46,17 @@ public class AMQPFederationAddressPolicyManager extends 
FederationAddressPolicyM
 
    protected final AMQPFederation federation;
    protected final AMQPFederationConsumerConfiguration configuration;
+   private final boolean largeMessageSync;

Review Comment:
   Same as above, not needed as it can be queried elsewhere.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java:
##########
@@ -472,7 +472,7 @@ protected MessageReader trySelectMessageReader(Receiver 
receiver, Delivery deliv
                coreMessageReader : (coreMessageReader = new 
AMQPTunneledCoreMessageReader(this));
          } else if (delivery.getMessageFormat() == 
AMQP_TUNNELED_CORE_LARGE_MESSAGE_FORMAT) {
             return coreLargeMessageReader != null ?
-               coreLargeMessageReader : (coreLargeMessageReader = new 
AMQPTunneledCoreLargeMessageReader(this));
+               coreLargeMessageReader : (coreLargeMessageReader = new 
AMQPTunneledCoreLargeMessageReader(this, federation.isLargeMessageSync()));

Review Comment:
   Same as above, not needed as it can be queried elsewhere.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java:
##########
@@ -78,6 +78,8 @@ public ProtonAbstractReceiver(AMQPSessionCallback sessionSPI,
       this.creditRunnable = createCreditRunnable(connection);
       useModified = 
this.connection.getProtocolManager().isUseModifiedForTransientDeliveryErrors();
       this.routingContext = new 
RoutingContextImpl(null).setDuplicateDetection(connection.getProtocolManager().isAmqpDuplicateDetection());
+      this.standardMessageReader = new AMQPMessageReader(this);
+      this.largeMessageReader = new AMQPLargeMessageReader(this, 
sessionSPI.isLargeMessgeSync());

Review Comment:
   This change is unnecessary if you just use the API add in the session 
context to check the value



##########
artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReaderTest.java:
##########
@@ -139,7 +139,7 @@ public void testReadDeliveryAnnotationsFromDeliveryBuffer() 
throws Exception {
 
    @Test
    public void testReadMessageByteByByteFromDeliveryBuffer() throws Exception {
-      AMQPTunneledCoreLargeMessageReader reader = new 
AMQPTunneledCoreLargeMessageReader(serverReceiver);
+      AMQPTunneledCoreLargeMessageReader reader = new 
AMQPTunneledCoreLargeMessageReader(serverReceiver, true);

Review Comment:
   Same as others, use the session API added



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java:
##########
@@ -113,17 +113,20 @@ public class AMQPFederationAddressConsumer implements 
FederationConsumerInternal
    private AMQPFederatedAddressDeliveryReceiver receiver;
    private Receiver protonReceiver;
    private boolean started;
+   private final boolean largeMessageSync;
    private volatile boolean closed;
    private Consumer<FederationConsumerInternal> remoteCloseHandler;
 
    public AMQPFederationAddressConsumer(AMQPFederation federation, 
AMQPFederationConsumerConfiguration configuration,
-                                        AMQPSessionContext session, 
FederationConsumerInfo consumerInfo, FederationReceiveFromAddressPolicy policy) 
{
+                                        AMQPSessionContext session, 
FederationConsumerInfo consumerInfo, FederationReceiveFromAddressPolicy policy,
+                                        boolean largeMessageSync) {
       this.federation = federation;
       this.consumerInfo = consumerInfo;
       this.policy = policy;
       this.connection = session.getAMQPConnectionContext();
       this.session = session;
       this.configuration = configuration;
+      this.largeMessageSync = largeMessageSync;

Review Comment:
   Same as above, not needed as it can be queried elsewhere.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java:
##########
@@ -508,7 +511,7 @@ protected MessageReader trySelectMessageReader(Receiver 
receiver, Delivery deliv
                coreMessageReader : (coreMessageReader = new 
AMQPTunneledCoreMessageReader(this));
          } else if (delivery.getMessageFormat() == 
AMQP_TUNNELED_CORE_LARGE_MESSAGE_FORMAT) {
             return coreLargeMessageReader != null ?
-               coreLargeMessageReader : (coreLargeMessageReader = new 
AMQPTunneledCoreLargeMessageReader(this));
+               coreLargeMessageReader : (coreLargeMessageReader = new 
AMQPTunneledCoreLargeMessageReader(this, largeMessageSync));

Review Comment:
   Same as above, not needed as it can be queried elsewhere.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java:
##########
@@ -81,7 +83,7 @@ protected FederationConsumerInternal 
createFederationConsumer(FederationConsumer
 
       // Don't initiate anything yet as the caller might need to register 
error handlers etc
       // before the attach is sent otherwise they could miss the failure case.
-      return new AMQPFederationAddressConsumer(federation, configuration, 
federation.getSessionContext(), consumerInfo, policy);
+      return new AMQPFederationAddressConsumer(federation, configuration, 
federation.getSessionContext(), consumerInfo, policy, largeMessageSync);

Review Comment:
   Same as above, not needed as it can be queried elsewhere.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReader.java:
##########
@@ -204,7 +206,7 @@ public Message readBytes(Delivery delivery) throws 
Exception {
             final Message result = coreLargeMessage.toMessage();
 
             // We don't want a close to delete the file now, so we release 
these resources.
-            coreLargeMessage.releaseResources(true, true);
+            coreLargeMessage.releaseResources(largeMessageSync, true);

Review Comment:
   You don't need to track this value, just call the accessor in the Session 
context



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java:
##########
@@ -299,7 +299,7 @@ protected MessageReader trySelectMessageReader(Receiver 
receiver, Delivery deliv
             coreMessageReader : (coreMessageReader = new 
AMQPTunneledCoreMessageReader(this));
       } else if (delivery.getMessageFormat() == 
AMQP_TUNNELED_CORE_LARGE_MESSAGE_FORMAT) {
          return coreLargeMessageReader != null ?
-            coreLargeMessageReader : (coreLargeMessageReader = new 
AMQPTunneledCoreLargeMessageReader(this));
+            coreLargeMessageReader : (coreLargeMessageReader = new 
AMQPTunneledCoreLargeMessageReader(this, 
server.getConfiguration().isLargeMessageSync()));

Review Comment:
   Same as above, not needed as it can be queried elsewhere.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java:
##########
@@ -101,7 +104,7 @@ public Message readBytes(Delivery delivery) throws 
Exception {
       final AMQPLargeMessage result;
 
       if (!delivery.isPartial()) {
-         currentMessage.releaseResources(true, true);
+         currentMessage.releaseResources(largeMessageSync, true);

Review Comment:
   You don't need to track this value, just call the accessor in the Session 
conetxt



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java:
##########
@@ -46,9 +46,9 @@ public abstract class ProtonAbstractReceiver extends 
ProtonInitializable impleme
 
    // Cached instances used for this receiver which will be swapped as message 
of varying types
    // are sent to this receiver from the remote peer.
-   protected final MessageReader standardMessageReader = new 
AMQPMessageReader(this);
+   protected final MessageReader standardMessageReader;

Review Comment:
   Same comment as above, don't change this, just use the accessor in the 
session context



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReader.java:
##########
@@ -98,9 +98,11 @@ private enum State {
    private LargeServerMessage coreLargeMessage;
    private int largeMessageSectionRemaining;
    private State state = State.CLOSED;
+   private final boolean largeMessageSync;

Review Comment:
   You don't need to track this value, just call the accessor in the Session 
context





Issue Time Tracking
-------------------

    Worklog Id:     (was: 896347)
    Time Spent: 20m  (was: 10m)

> Option to not sync large messages
> ---------------------------------
>
>                 Key: ARTEMIS-4544
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4544
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>             Fix For: 2.32.0
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> When writing large messages we always sync the large message before closing.
> We should have an option to bypass that and trust the OS on that.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to