gemmellr commented on code in PR #5173:
URL: https://github.com/apache/activemq-artemis/pull/5173#discussion_r1734907573


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java:
##########
@@ -177,6 +180,48 @@ public void onError(int errorCode, String errorMessage) {
 
    private AckManager ackManager;
 
+   /** This method will wait both replication and storage to finish their 
current operations. */
+   public void flush() {
+      CountDownLatch latch = new CountDownLatch(1);
+      connection.runNow(() -> {
+         OperationContext oldContext = OperationContextImpl.getContext();
+         try {
+            OperationContextImpl.setContext(mirrorContext);
+            mirrorContext.executeOnCompletion(new IOCallback() {
+               @Override
+               public void done() {
+                  latch.countDown();
+               }
+
+               @Override
+               public void onError(int errorCode, String errorMessage) {
+                  logger.warn("error code = {} / message = {}", errorCode, 
errorMessage);
+                  latch.countDown();
+               }
+            });
+         } finally {
+            OperationContextImpl.setContext(oldContext);
+         }
+      });
+
+      long timeout;
+      try {
+         timeout = connection.getProtocolManager().getAckManagerFlushTimeout();
+      } catch (Throwable e) {
+         logger.warn(e.getMessage(), e);
+         timeout = 10_000;
+      }
+
+      try {
+         if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
+            ActiveMQAMQPProtocolLogger.LOGGER.timedOutAckManager(timeout);
+         }
+      } catch (InterruptedException e) {
+         logger.warn(e.getMessage(), e);

Review Comment:
   Similarly could do with a useful message.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java:
##########
@@ -177,6 +180,48 @@ public void onError(int errorCode, String errorMessage) {
 
    private AckManager ackManager;
 
+   /** This method will wait both replication and storage to finish their 
current operations. */
+   public void flush() {
+      CountDownLatch latch = new CountDownLatch(1);
+      connection.runNow(() -> {
+         OperationContext oldContext = OperationContextImpl.getContext();
+         try {
+            OperationContextImpl.setContext(mirrorContext);
+            mirrorContext.executeOnCompletion(new IOCallback() {
+               @Override
+               public void done() {
+                  latch.countDown();
+               }
+
+               @Override
+               public void onError(int errorCode, String errorMessage) {
+                  logger.warn("error code = {} / message = {}", errorCode, 
errorMessage);

Review Comment:
   The log message isnt particularly descriptive, should be clearer whats going 
on and the impact.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/AckRetry.java:
##########
@@ -54,17 +54,13 @@ public AckRetry(String nodeID, long messageID, AckReason 
reason) {
    }
 
 
-   public byte[] getTemporaryNodeBytes() {
+   public synchronized byte[] getNodeIDBytes() {

Review Comment:
   Seems odd to rename the method but not the field



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java:
##########
@@ -198,6 +203,18 @@ public AMQPConnectionContext(ProtonProtocolManager 
protocolManager,
       }
    }
 
+   public List<AMQPMirrorControllerTarget> getMirrorControllerTargets() {
+      return mirrorControllerTargets;
+   }
+
+   public AMQPConnectionContext 
addMirrorControllerTarget(AMQPMirrorControllerTarget mirrorControllerTarget) {
+      if (mirrorControllerTargets == null) {
+         mirrorControllerTargets = new ArrayList<>();
+      }
+      mirrorControllerTargets.add(mirrorControllerTarget);

Review Comment:
   There is no remove anywhere, so this is already a potential leak.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java:
##########
@@ -177,6 +180,48 @@ public void onError(int errorCode, String errorMessage) {
 
    private AckManager ackManager;
 
+   /** This method will wait both replication and storage to finish their 
current operations. */
+   public void flush() {
+      CountDownLatch latch = new CountDownLatch(1);
+      connection.runNow(() -> {
+         OperationContext oldContext = OperationContextImpl.getContext();
+         try {
+            OperationContextImpl.setContext(mirrorContext);
+            mirrorContext.executeOnCompletion(new IOCallback() {
+               @Override
+               public void done() {
+                  latch.countDown();
+               }
+
+               @Override
+               public void onError(int errorCode, String errorMessage) {
+                  logger.warn("error code = {} / message = {}", errorCode, 
errorMessage);
+                  latch.countDown();
+               }
+            });
+         } finally {
+            OperationContextImpl.setContext(oldContext);
+         }
+      });
+
+      long timeout;
+      try {
+         timeout = connection.getProtocolManager().getAckManagerFlushTimeout();
+      } catch (Throwable e) {
+         logger.warn(e.getMessage(), e);
+         timeout = 10_000;
+      }

Review Comment:
   This one could do with even having a message of its own. The potential 
failure would seem to be an NPE if the connection or protocol manager isnt 
there, in which case its just going to depend on what JVM is being used whether 
it says anything useful at all (newer JVMs will indicate what was null...old 
ones wont).



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java:
##########
@@ -149,15 +149,32 @@ public boolean initRetry() {
 
       HashMap<SimpleString, LongObjectHashMap<JournalHashMap<AckRetry, 
AckRetry, Queue>>> retries = sortRetries();
 
+      scanAndFlushMirrorTargets();
+
       if (retries.isEmpty()) {
          logger.trace("Nothing to retry!, server={}", server);
          return false;
       }
 
-      progress = new MultiStepProgress(sortRetries());
+      progress = new MultiStepProgress(retries);
       return true;
    }
 
+   private void scanAndFlushMirrorTargets() {
+      logger.debug("scanning and flushing mirror targets");
+      // this will navigate on each connection, find the connection that has a 
mirror controller, and call flushMirrorTarget for each MirrorTargets. (it 
should be 1 in most cases)
+      // An alternative design instead of going through the connections, would 
be to register the MirrorTargets within the AckManager, however to avoid memory 
leaks after disconnects and reconnects it is safer to
+      // scan through the connections
+      server.getRemotingService().getConnections().stream().
+         filter(c -> c instanceof ActiveMQProtonRemotingConnection && 
((ActiveMQProtonRemotingConnection) 
c).getAmqpConnection().getMirrorControllerTargets() != null).
+         forEach(c -> ((ActiveMQProtonRemotingConnection) 
c).getAmqpConnection().getMirrorControllerTargets().forEach(this::flushMirrorTarget));

Review Comment:
   This might work for now, but anyone with lots of connections seems likely to 
find this causes a noticable amount of CPU usage. Per other comment, its also 
already a leak of sorts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org
For additional commands, e-mail: gitbox-h...@activemq.apache.org
For further information, visit: https://activemq.apache.org/contact


Reply via email to