gemmellr commented on a change in pull request #3857:
URL: https://github.com/apache/activemq-artemis/pull/3857#discussion_r811813494



##########
File path: 
artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConnectionAbstract.java
##########
@@ -42,10 +42,10 @@
    protected String password;
 
    @Option(name = "--clientID", description = "ClientID to be associated with 
connection")
-   String clientID;
+   protected String clientID;
 
    @Option(name = "--protocol", description = "Protocol used. Valid values are 
amqp or core. Default=core.")
-   String protocol = "core";
+   protected String protocol = "core";

Review comment:
       Why not just use the existing getter/setter?

##########
File path: 
artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConnectionAbstract.java
##########
@@ -42,10 +42,10 @@
    protected String password;
 
    @Option(name = "--clientID", description = "ClientID to be associated with 
connection")
-   String clientID;
+   protected String clientID;

Review comment:
       Why not just use the existing getter/setter?

##########
File path: 
artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/perf/AsyncJms2ProducerFacade.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.cli.commands.messages.perf;
+
+import javax.jms.BytesMessage;
+import javax.jms.CompletionListener;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
+import static java.util.Objects.requireNonNull;
+
+public final class AsyncJms2ProducerFacade {
+
+   private final long id;
+   protected final Session session;
+   private final MessageProducer producer;
+
+   private long pending;
+   private final long maxPending;
+
+   private final long transactionCapacity;
+   private long pendingMsgInTransaction;
+   private long completedMsgInTransaction;
+
+   private final List<Runnable> availableObservers;
+   private final List<Runnable> closedObservers;
+
+   private static final AtomicLongFieldUpdater<AsyncJms2ProducerFacade> 
MESSAGE_SENT_UPDATER = 
AtomicLongFieldUpdater.newUpdater(AsyncJms2ProducerFacade.class, "messageSent");
+   private static final AtomicLongFieldUpdater<AsyncJms2ProducerFacade> 
MESSAGE_COMPLETED_UPDATER = 
AtomicLongFieldUpdater.newUpdater(AsyncJms2ProducerFacade.class, 
"messageCompleted");
+   private static final AtomicLongFieldUpdater<AsyncJms2ProducerFacade> 
NOT_AVAILABLE_UPDATER = 
AtomicLongFieldUpdater.newUpdater(AsyncJms2ProducerFacade.class, 
"notAvailable");
+
+   private volatile long messageSent;
+   private volatile long messageCompleted;
+   private volatile long notAvailable;
+
+   private boolean closing;
+   private boolean closed;
+   private final Destination destination;
+
+   public AsyncJms2ProducerFacade(final long id,
+                                  final Session session,
+                                  final MessageProducer producer,
+                                  final Destination destination,
+                                  final long maxPending,
+                                  final long transactionCapacity) {
+      this.id = id;
+      this.session = requireNonNull(session);
+      this.producer = requireNonNull(producer);
+      this.destination = destination;
+      this.pending = 0;
+      this.maxPending = transactionCapacity > 0 && maxPending > 0 ? 
Math.max(maxPending, transactionCapacity) : maxPending;
+      this.availableObservers = new ArrayList<>(1);
+      this.closedObservers = new ArrayList<>(1);
+      this.messageSent = 0;
+      this.messageCompleted = 0;
+      this.notAvailable = 0;
+      try {
+         if (transactionCapacity < 0) {
+            throw new IllegalStateException("transactionCapacity must be >= 
0");
+         }
+         if (transactionCapacity > 0) {
+            if (!session.getTransacted()) {
+               throw new IllegalStateException("session must be transacted or 
transactionCapacity should be null");
+            }
+         } else {
+            if (session.getTransacted()) {
+               throw new IllegalStateException("session cannot be transacted 
or transactionCapacity cannot be null ");
+            }
+         }
+      } catch (final JMSException ex) {
+         throw new IllegalStateException(ex);
+      }
+      this.transactionCapacity = transactionCapacity;
+      this.pendingMsgInTransaction = 0;
+      this.completedMsgInTransaction = 0;
+      this.closing = false;
+      this.closed = false;
+   }
+
+   public long getId() {
+      return id;
+   }
+
+   public Destination getDestination() {
+      return destination;
+   }
+
+   BytesMessage createBytesMessage() throws JMSException {
+      return session.createBytesMessage();
+   }
+
+   private void addedPendingSend() {
+      if (transactionCapacity > 0 && pendingMsgInTransaction == 
transactionCapacity) {
+         throw new IllegalStateException("cannot add more pending send");
+      }
+      if (maxPending > 0 && pending == maxPending) {
+         throw new IllegalStateException("cannot add more pending");
+      }
+      pending++;
+      pendingMsgInTransaction++;
+   }
+
+   private boolean isAvailable() {
+      if (maxPending > 0 && pending == maxPending) {
+         return false;
+      }
+      return transactionCapacity == 0 || pendingMsgInTransaction != 
transactionCapacity;
+   }
+
+   public enum SendAttemptResult {
+      Closing, Closed, NotAvailable, Success
+   }
+
+   public SendAttemptResult trySend(final Message message,
+                                    final CompletionListener 
completionListener,
+                                    final Runnable availableObserver) throws 
JMSException {
+      if (closing) {
+         return SendAttemptResult.Closing;
+      }
+      if (closed) {
+         return SendAttemptResult.Closed;
+      }
+      if (!isAvailable()) {
+         availableObservers.add(availableObserver);
+         orderedIncrementNotAvailable();
+         return SendAttemptResult.NotAvailable;
+      }
+      producer.send(message, completionListener);
+      orderedIncrementSent();
+      addedPendingSend();
+      return SendAttemptResult.Success;
+   }
+
+   public void onSendErrored() {
+      if (closed) {
+         return;
+      }
+      availableObservers.clear();
+      closedObservers.forEach(Runnable::run);
+      closedObservers.clear();
+      closed = true;
+   }
+
+   public void onSendCompleted() {
+      if (closed) {
+         return;
+      }
+      orderedIncrementCompleted();
+      if (transactionCapacity > 0 && completedMsgInTransaction == 
transactionCapacity) {
+         throw new IllegalStateException("cannot complete more send");
+      }
+      if (pending == 0) {
+         throw new IllegalStateException("cannot complete more send");
+      }
+      pending--;
+      completedMsgInTransaction++;
+      if (transactionCapacity > 0) {
+         if (completedMsgInTransaction == transactionCapacity || (closing && 
pending == 0)) {
+            completedMsgInTransaction = 0;
+            pendingMsgInTransaction = 0;
+            try {
+               session.commit();
+            } catch (final Throwable t) {
+               // TODO log

Review comment:
       Seems fairly important to do this at least. Not immediately clear why it 
wouldnt just throw. What if it was a TransactionRolledBackException?

##########
File path: 
artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/perf/AsyncJms2ProducerFacade.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.cli.commands.messages.perf;
+
+import javax.jms.BytesMessage;
+import javax.jms.CompletionListener;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
+import static java.util.Objects.requireNonNull;
+
+public final class AsyncJms2ProducerFacade {
+
+   private final long id;
+   protected final Session session;
+   private final MessageProducer producer;
+
+   private long pending;
+   private final long maxPending;
+
+   private final long transactionCapacity;
+   private long pendingMsgInTransaction;
+   private long completedMsgInTransaction;
+
+   private final List<Runnable> availableObservers;
+   private final List<Runnable> closedObservers;
+
+   private static final AtomicLongFieldUpdater<AsyncJms2ProducerFacade> 
MESSAGE_SENT_UPDATER = 
AtomicLongFieldUpdater.newUpdater(AsyncJms2ProducerFacade.class, "messageSent");
+   private static final AtomicLongFieldUpdater<AsyncJms2ProducerFacade> 
MESSAGE_COMPLETED_UPDATER = 
AtomicLongFieldUpdater.newUpdater(AsyncJms2ProducerFacade.class, 
"messageCompleted");
+   private static final AtomicLongFieldUpdater<AsyncJms2ProducerFacade> 
NOT_AVAILABLE_UPDATER = 
AtomicLongFieldUpdater.newUpdater(AsyncJms2ProducerFacade.class, 
"notAvailable");
+
+   private volatile long messageSent;
+   private volatile long messageCompleted;
+   private volatile long notAvailable;
+
+   private boolean closing;
+   private boolean closed;
+   private final Destination destination;
+
+   public AsyncJms2ProducerFacade(final long id,
+                                  final Session session,
+                                  final MessageProducer producer,
+                                  final Destination destination,
+                                  final long maxPending,
+                                  final long transactionCapacity) {
+      this.id = id;
+      this.session = requireNonNull(session);
+      this.producer = requireNonNull(producer);
+      this.destination = destination;
+      this.pending = 0;
+      this.maxPending = transactionCapacity > 0 && maxPending > 0 ? 
Math.max(maxPending, transactionCapacity) : maxPending;
+      this.availableObservers = new ArrayList<>(1);
+      this.closedObservers = new ArrayList<>(1);
+      this.messageSent = 0;
+      this.messageCompleted = 0;
+      this.notAvailable = 0;
+      try {
+         if (transactionCapacity < 0) {
+            throw new IllegalStateException("transactionCapacity must be >= 
0");
+         }
+         if (transactionCapacity > 0) {
+            if (!session.getTransacted()) {
+               throw new IllegalStateException("session must be transacted or 
transactionCapacity should be null");
+            }
+         } else {
+            if (session.getTransacted()) {
+               throw new IllegalStateException("session cannot be transacted 
or transactionCapacity cannot be null ");

Review comment:
       Seems odd to reference null when checking a longs relative value against 
0.

##########
File path: 
artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/perf/AsyncJms2ProducerFacade.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.cli.commands.messages.perf;
+
+import javax.jms.BytesMessage;
+import javax.jms.CompletionListener;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
+import static java.util.Objects.requireNonNull;
+
+public final class AsyncJms2ProducerFacade {
+
+   private final long id;
+   protected final Session session;
+   private final MessageProducer producer;
+
+   private long pending;
+   private final long maxPending;
+
+   private final long transactionCapacity;
+   private long pendingMsgInTransaction;
+   private long completedMsgInTransaction;
+
+   private final List<Runnable> availableObservers;
+   private final List<Runnable> closedObservers;
+
+   private static final AtomicLongFieldUpdater<AsyncJms2ProducerFacade> 
MESSAGE_SENT_UPDATER = 
AtomicLongFieldUpdater.newUpdater(AsyncJms2ProducerFacade.class, "messageSent");
+   private static final AtomicLongFieldUpdater<AsyncJms2ProducerFacade> 
MESSAGE_COMPLETED_UPDATER = 
AtomicLongFieldUpdater.newUpdater(AsyncJms2ProducerFacade.class, 
"messageCompleted");
+   private static final AtomicLongFieldUpdater<AsyncJms2ProducerFacade> 
NOT_AVAILABLE_UPDATER = 
AtomicLongFieldUpdater.newUpdater(AsyncJms2ProducerFacade.class, 
"notAvailable");
+
+   private volatile long messageSent;
+   private volatile long messageCompleted;
+   private volatile long notAvailable;
+
+   private boolean closing;
+   private boolean closed;
+   private final Destination destination;
+
+   public AsyncJms2ProducerFacade(final long id,
+                                  final Session session,
+                                  final MessageProducer producer,
+                                  final Destination destination,
+                                  final long maxPending,
+                                  final long transactionCapacity) {
+      this.id = id;
+      this.session = requireNonNull(session);
+      this.producer = requireNonNull(producer);
+      this.destination = destination;
+      this.pending = 0;
+      this.maxPending = transactionCapacity > 0 && maxPending > 0 ? 
Math.max(maxPending, transactionCapacity) : maxPending;
+      this.availableObservers = new ArrayList<>(1);
+      this.closedObservers = new ArrayList<>(1);
+      this.messageSent = 0;
+      this.messageCompleted = 0;
+      this.notAvailable = 0;
+      try {
+         if (transactionCapacity < 0) {
+            throw new IllegalStateException("transactionCapacity must be >= 
0");
+         }
+         if (transactionCapacity > 0) {
+            if (!session.getTransacted()) {
+               throw new IllegalStateException("session must be transacted or 
transactionCapacity should be null");
+            }
+         } else {
+            if (session.getTransacted()) {
+               throw new IllegalStateException("session cannot be transacted 
or transactionCapacity cannot be null ");
+            }
+         }
+      } catch (final JMSException ex) {
+         throw new IllegalStateException(ex);
+      }
+      this.transactionCapacity = transactionCapacity;
+      this.pendingMsgInTransaction = 0;
+      this.completedMsgInTransaction = 0;
+      this.closing = false;
+      this.closed = false;
+   }
+
+   public long getId() {
+      return id;
+   }
+
+   public Destination getDestination() {
+      return destination;
+   }
+
+   BytesMessage createBytesMessage() throws JMSException {
+      return session.createBytesMessage();
+   }
+
+   private void addedPendingSend() {
+      if (transactionCapacity > 0 && pendingMsgInTransaction == 
transactionCapacity) {
+         throw new IllegalStateException("cannot add more pending send");
+      }
+      if (maxPending > 0 && pending == maxPending) {
+         throw new IllegalStateException("cannot add more pending");

Review comment:
       Should the messages match (or else be more clearly differentiated)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to