gemmellr commented on code in PR #5173: URL: https://github.com/apache/activemq-artemis/pull/5173#discussion_r1734907573
########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java: ########## @@ -177,6 +180,48 @@ public void onError(int errorCode, String errorMessage) { private AckManager ackManager; + /** This method will wait both replication and storage to finish their current operations. */ + public void flush() { + CountDownLatch latch = new CountDownLatch(1); + connection.runNow(() -> { + OperationContext oldContext = OperationContextImpl.getContext(); + try { + OperationContextImpl.setContext(mirrorContext); + mirrorContext.executeOnCompletion(new IOCallback() { + @Override + public void done() { + latch.countDown(); + } + + @Override + public void onError(int errorCode, String errorMessage) { + logger.warn("error code = {} / message = {}", errorCode, errorMessage); + latch.countDown(); + } + }); + } finally { + OperationContextImpl.setContext(oldContext); + } + }); + + long timeout; + try { + timeout = connection.getProtocolManager().getAckManagerFlushTimeout(); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + timeout = 10_000; + } + + try { + if (!latch.await(timeout, TimeUnit.MILLISECONDS)) { + ActiveMQAMQPProtocolLogger.LOGGER.timedOutAckManager(timeout); + } + } catch (InterruptedException e) { + logger.warn(e.getMessage(), e); Review Comment: Similarly could do with a useful message. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java: ########## @@ -177,6 +180,48 @@ public void onError(int errorCode, String errorMessage) { private AckManager ackManager; + /** This method will wait both replication and storage to finish their current operations. */ + public void flush() { + CountDownLatch latch = new CountDownLatch(1); + connection.runNow(() -> { + OperationContext oldContext = OperationContextImpl.getContext(); + try { + OperationContextImpl.setContext(mirrorContext); + mirrorContext.executeOnCompletion(new IOCallback() { + @Override + public void done() { + latch.countDown(); + } + + @Override + public void onError(int errorCode, String errorMessage) { + logger.warn("error code = {} / message = {}", errorCode, errorMessage); Review Comment: The log message isnt particularly descriptive, should be clearer whats going on and the impact. ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/AckRetry.java: ########## @@ -54,17 +54,13 @@ public AckRetry(String nodeID, long messageID, AckReason reason) { } - public byte[] getTemporaryNodeBytes() { + public synchronized byte[] getNodeIDBytes() { Review Comment: Seems odd to rename the method but not the field ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java: ########## @@ -198,6 +203,18 @@ public AMQPConnectionContext(ProtonProtocolManager protocolManager, } } + public List<AMQPMirrorControllerTarget> getMirrorControllerTargets() { + return mirrorControllerTargets; + } + + public AMQPConnectionContext addMirrorControllerTarget(AMQPMirrorControllerTarget mirrorControllerTarget) { + if (mirrorControllerTargets == null) { + mirrorControllerTargets = new ArrayList<>(); + } + mirrorControllerTargets.add(mirrorControllerTarget); Review Comment: There is no remove anywhere, so this is already a potential leak. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java: ########## @@ -177,6 +180,48 @@ public void onError(int errorCode, String errorMessage) { private AckManager ackManager; + /** This method will wait both replication and storage to finish their current operations. */ + public void flush() { + CountDownLatch latch = new CountDownLatch(1); + connection.runNow(() -> { + OperationContext oldContext = OperationContextImpl.getContext(); + try { + OperationContextImpl.setContext(mirrorContext); + mirrorContext.executeOnCompletion(new IOCallback() { + @Override + public void done() { + latch.countDown(); + } + + @Override + public void onError(int errorCode, String errorMessage) { + logger.warn("error code = {} / message = {}", errorCode, errorMessage); + latch.countDown(); + } + }); + } finally { + OperationContextImpl.setContext(oldContext); + } + }); + + long timeout; + try { + timeout = connection.getProtocolManager().getAckManagerFlushTimeout(); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + timeout = 10_000; + } Review Comment: This one could do with even having a message of its own. The potential failure would seem to be an NPE if the connection or protocol manager isnt there, in which case its just going to depend on what JVM is being used whether it says anything useful at all (newer JVMs will indicate what was null...old ones wont). ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java: ########## @@ -149,15 +149,32 @@ public boolean initRetry() { HashMap<SimpleString, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>>> retries = sortRetries(); + scanAndFlushMirrorTargets(); + if (retries.isEmpty()) { logger.trace("Nothing to retry!, server={}", server); return false; } - progress = new MultiStepProgress(sortRetries()); + progress = new MultiStepProgress(retries); return true; } + private void scanAndFlushMirrorTargets() { + logger.debug("scanning and flushing mirror targets"); + // this will navigate on each connection, find the connection that has a mirror controller, and call flushMirrorTarget for each MirrorTargets. (it should be 1 in most cases) + // An alternative design instead of going through the connections, would be to register the MirrorTargets within the AckManager, however to avoid memory leaks after disconnects and reconnects it is safer to + // scan through the connections + server.getRemotingService().getConnections().stream(). + filter(c -> c instanceof ActiveMQProtonRemotingConnection && ((ActiveMQProtonRemotingConnection) c).getAmqpConnection().getMirrorControllerTargets() != null). + forEach(c -> ((ActiveMQProtonRemotingConnection) c).getAmqpConnection().getMirrorControllerTargets().forEach(this::flushMirrorTarget)); Review Comment: This might work for now, but anyone with lots of connections seems likely to find this causes a noticable amount of CPU usage. Per other comment, its also already a leak of sorts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For additional commands, e-mail: gitbox-h...@activemq.apache.org For further information, visit: https://activemq.apache.org/contact