[
https://issues.apache.org/jira/browse/ARTEMIS-4544?focusedWorklogId=896347&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-896347
]
ASF GitHub Bot logged work on ARTEMIS-4544:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 19/Dec/23 19:15
Start Date: 19/Dec/23 19:15
Worklog Time Spent: 10m
Work Description: tabish121 commented on code in PR #4721:
URL: https://github.com/apache/activemq-artemis/pull/4721#discussion_r1431828482
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java:
##########
@@ -182,6 +182,10 @@ public boolean isWritable(ReadyListener readyListener) {
return connection.isWritable(readyListener);
}
+ public boolean isLargeMessageSync() {
+ return server.getConfiguration().isLargeMessageSync();
+ }
+
Review Comment:
Extra newline clutter
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java:
##########
@@ -239,7 +243,7 @@ public synchronized AMQPFederation
addQueueMatchPolicy(FederationReceiveFromQueu
* @throws ActiveMQException if an error occurs processing the added policy
*/
public synchronized AMQPFederation
addAddressMatchPolicy(FederationReceiveFromAddressPolicy addressPolicy) throws
ActiveMQException {
- final FederationAddressPolicyManager manager = new
AMQPFederationAddressPolicyManager(this, addressPolicy);
+ final FederationAddressPolicyManager manager = new
AMQPFederationAddressPolicyManager(this, addressPolicy,
server.getConfiguration().isLargeMessageSync());
Review Comment:
Adding unneeded constructor args makes the code more unreadable and less
maintainable. The bits that need to inspect this value can simply ask the
connection or session SPI for the value when it is needed.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java:
##########
@@ -113,17 +113,20 @@ public class AMQPFederationAddressConsumer implements
FederationConsumerInternal
private AMQPFederatedAddressDeliveryReceiver receiver;
private Receiver protonReceiver;
private boolean started;
+ private final boolean largeMessageSync;
Review Comment:
Not needed to track this, this can be queried and not pollute the entire
code base with copies of the configuration value.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java:
##########
@@ -113,17 +113,20 @@ public class AMQPFederationAddressConsumer implements
FederationConsumerInternal
private AMQPFederatedAddressDeliveryReceiver receiver;
private Receiver protonReceiver;
private boolean started;
+ private final boolean largeMessageSync;
private volatile boolean closed;
private Consumer<FederationConsumerInternal> remoteCloseHandler;
public AMQPFederationAddressConsumer(AMQPFederation federation,
AMQPFederationConsumerConfiguration configuration,
- AMQPSessionContext session,
FederationConsumerInfo consumerInfo, FederationReceiveFromAddressPolicy policy)
{
+ AMQPSessionContext session,
FederationConsumerInfo consumerInfo, FederationReceiveFromAddressPolicy policy,
+ boolean largeMessageSync) {
Review Comment:
Same as above, not needed as it can be queried elsewhere.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java:
##########
@@ -77,6 +77,10 @@ public abstract class AMQPFederation implements
FederationInternal {
protected boolean started;
protected volatile boolean connected;
+ public boolean isLargeMessageSync() {
Review Comment:
It isn't really necessary to define an accessor here as there are already
accessors in the Session and Connection SPI bits.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java:
##########
@@ -46,15 +46,17 @@ public class AMQPFederationAddressPolicyManager extends
FederationAddressPolicyM
protected final AMQPFederation federation;
protected final AMQPFederationConsumerConfiguration configuration;
+ private final boolean largeMessageSync;
protected final String remoteQueueFilter;
- public AMQPFederationAddressPolicyManager(AMQPFederation federation,
FederationReceiveFromAddressPolicy addressPolicy) throws ActiveMQException {
+ public AMQPFederationAddressPolicyManager(AMQPFederation federation,
FederationReceiveFromAddressPolicy addressPolicy, boolean largeMessageSync)
throws ActiveMQException {
super(federation, addressPolicy);
this.federation = federation;
this.remoteQueueFilter = generateAddressFilter(policy.getMaxHops());
this.configuration = new AMQPFederationConsumerConfiguration(federation,
policy.getProperties());
+ this.largeMessageSync = largeMessageSync;
Review Comment:
Same as above, not needed as it can be queried elsewhere.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java:
##########
@@ -46,15 +46,17 @@ public class AMQPFederationAddressPolicyManager extends
FederationAddressPolicyM
protected final AMQPFederation federation;
protected final AMQPFederationConsumerConfiguration configuration;
+ private final boolean largeMessageSync;
protected final String remoteQueueFilter;
- public AMQPFederationAddressPolicyManager(AMQPFederation federation,
FederationReceiveFromAddressPolicy addressPolicy) throws ActiveMQException {
+ public AMQPFederationAddressPolicyManager(AMQPFederation federation,
FederationReceiveFromAddressPolicy addressPolicy, boolean largeMessageSync)
throws ActiveMQException {
Review Comment:
Same as above, not needed as it can be queried elsewhere.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java:
##########
@@ -46,15 +46,17 @@ public class AMQPFederationAddressPolicyManager extends
FederationAddressPolicyM
protected final AMQPFederation federation;
protected final AMQPFederationConsumerConfiguration configuration;
+ private final boolean largeMessageSync;
Review Comment:
Same as above, not needed as it can be queried elsewhere.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java:
##########
@@ -472,7 +472,7 @@ protected MessageReader trySelectMessageReader(Receiver
receiver, Delivery deliv
coreMessageReader : (coreMessageReader = new
AMQPTunneledCoreMessageReader(this));
} else if (delivery.getMessageFormat() ==
AMQP_TUNNELED_CORE_LARGE_MESSAGE_FORMAT) {
return coreLargeMessageReader != null ?
- coreLargeMessageReader : (coreLargeMessageReader = new
AMQPTunneledCoreLargeMessageReader(this));
+ coreLargeMessageReader : (coreLargeMessageReader = new
AMQPTunneledCoreLargeMessageReader(this, federation.isLargeMessageSync()));
Review Comment:
Same as above, not needed as it can be queried elsewhere.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java:
##########
@@ -78,6 +78,8 @@ public ProtonAbstractReceiver(AMQPSessionCallback sessionSPI,
this.creditRunnable = createCreditRunnable(connection);
useModified =
this.connection.getProtocolManager().isUseModifiedForTransientDeliveryErrors();
this.routingContext = new
RoutingContextImpl(null).setDuplicateDetection(connection.getProtocolManager().isAmqpDuplicateDetection());
+ this.standardMessageReader = new AMQPMessageReader(this);
+ this.largeMessageReader = new AMQPLargeMessageReader(this,
sessionSPI.isLargeMessgeSync());
Review Comment:
This change is unnecessary if you just use the API add in the session
context to check the value
##########
artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReaderTest.java:
##########
@@ -139,7 +139,7 @@ public void testReadDeliveryAnnotationsFromDeliveryBuffer()
throws Exception {
@Test
public void testReadMessageByteByByteFromDeliveryBuffer() throws Exception {
- AMQPTunneledCoreLargeMessageReader reader = new
AMQPTunneledCoreLargeMessageReader(serverReceiver);
+ AMQPTunneledCoreLargeMessageReader reader = new
AMQPTunneledCoreLargeMessageReader(serverReceiver, true);
Review Comment:
Same as others, use the session API added
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java:
##########
@@ -113,17 +113,20 @@ public class AMQPFederationAddressConsumer implements
FederationConsumerInternal
private AMQPFederatedAddressDeliveryReceiver receiver;
private Receiver protonReceiver;
private boolean started;
+ private final boolean largeMessageSync;
private volatile boolean closed;
private Consumer<FederationConsumerInternal> remoteCloseHandler;
public AMQPFederationAddressConsumer(AMQPFederation federation,
AMQPFederationConsumerConfiguration configuration,
- AMQPSessionContext session,
FederationConsumerInfo consumerInfo, FederationReceiveFromAddressPolicy policy)
{
+ AMQPSessionContext session,
FederationConsumerInfo consumerInfo, FederationReceiveFromAddressPolicy policy,
+ boolean largeMessageSync) {
this.federation = federation;
this.consumerInfo = consumerInfo;
this.policy = policy;
this.connection = session.getAMQPConnectionContext();
this.session = session;
this.configuration = configuration;
+ this.largeMessageSync = largeMessageSync;
Review Comment:
Same as above, not needed as it can be queried elsewhere.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java:
##########
@@ -508,7 +511,7 @@ protected MessageReader trySelectMessageReader(Receiver
receiver, Delivery deliv
coreMessageReader : (coreMessageReader = new
AMQPTunneledCoreMessageReader(this));
} else if (delivery.getMessageFormat() ==
AMQP_TUNNELED_CORE_LARGE_MESSAGE_FORMAT) {
return coreLargeMessageReader != null ?
- coreLargeMessageReader : (coreLargeMessageReader = new
AMQPTunneledCoreLargeMessageReader(this));
+ coreLargeMessageReader : (coreLargeMessageReader = new
AMQPTunneledCoreLargeMessageReader(this, largeMessageSync));
Review Comment:
Same as above, not needed as it can be queried elsewhere.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java:
##########
@@ -81,7 +83,7 @@ protected FederationConsumerInternal
createFederationConsumer(FederationConsumer
// Don't initiate anything yet as the caller might need to register
error handlers etc
// before the attach is sent otherwise they could miss the failure case.
- return new AMQPFederationAddressConsumer(federation, configuration,
federation.getSessionContext(), consumerInfo, policy);
+ return new AMQPFederationAddressConsumer(federation, configuration,
federation.getSessionContext(), consumerInfo, policy, largeMessageSync);
Review Comment:
Same as above, not needed as it can be queried elsewhere.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReader.java:
##########
@@ -204,7 +206,7 @@ public Message readBytes(Delivery delivery) throws
Exception {
final Message result = coreLargeMessage.toMessage();
// We don't want a close to delete the file now, so we release
these resources.
- coreLargeMessage.releaseResources(true, true);
+ coreLargeMessage.releaseResources(largeMessageSync, true);
Review Comment:
You don't need to track this value, just call the accessor in the Session
context
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java:
##########
@@ -299,7 +299,7 @@ protected MessageReader trySelectMessageReader(Receiver
receiver, Delivery deliv
coreMessageReader : (coreMessageReader = new
AMQPTunneledCoreMessageReader(this));
} else if (delivery.getMessageFormat() ==
AMQP_TUNNELED_CORE_LARGE_MESSAGE_FORMAT) {
return coreLargeMessageReader != null ?
- coreLargeMessageReader : (coreLargeMessageReader = new
AMQPTunneledCoreLargeMessageReader(this));
+ coreLargeMessageReader : (coreLargeMessageReader = new
AMQPTunneledCoreLargeMessageReader(this,
server.getConfiguration().isLargeMessageSync()));
Review Comment:
Same as above, not needed as it can be queried elsewhere.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java:
##########
@@ -101,7 +104,7 @@ public Message readBytes(Delivery delivery) throws
Exception {
final AMQPLargeMessage result;
if (!delivery.isPartial()) {
- currentMessage.releaseResources(true, true);
+ currentMessage.releaseResources(largeMessageSync, true);
Review Comment:
You don't need to track this value, just call the accessor in the Session
conetxt
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java:
##########
@@ -46,9 +46,9 @@ public abstract class ProtonAbstractReceiver extends
ProtonInitializable impleme
// Cached instances used for this receiver which will be swapped as message
of varying types
// are sent to this receiver from the remote peer.
- protected final MessageReader standardMessageReader = new
AMQPMessageReader(this);
+ protected final MessageReader standardMessageReader;
Review Comment:
Same comment as above, don't change this, just use the accessor in the
session context
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReader.java:
##########
@@ -98,9 +98,11 @@ private enum State {
private LargeServerMessage coreLargeMessage;
private int largeMessageSectionRemaining;
private State state = State.CLOSED;
+ private final boolean largeMessageSync;
Review Comment:
You don't need to track this value, just call the accessor in the Session
context
Issue Time Tracking
-------------------
Worklog Id: (was: 896347)
Time Spent: 20m (was: 10m)
> Option to not sync large messages
> ---------------------------------
>
> Key: ARTEMIS-4544
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4544
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Reporter: Clebert Suconic
> Assignee: Clebert Suconic
> Priority: Major
> Fix For: 2.32.0
>
> Time Spent: 20m
> Remaining Estimate: 0h
>
> When writing large messages we always sync the large message before closing.
> We should have an option to bypass that and trust the OS on that.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)