ARTEMIS-904 Remove cyclic dependencies from artemis-cli move classes and methods to their correct location to avoid cyclic dependencies between packages and classes.
ARTEMIS-904 Remove cyclic dependencies from artemis-cli move classes and methods to their correct location to avoid cyclic dependencies between packages and classes. ARTEMIS-904 Remove cyclic dependencies from artemis-cli move classes and methods to their correct location to avoid cyclic dependencies between packages and classes. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f82623a2 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f82623a2 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f82623a2 Branch: refs/heads/master Commit: f82623a20c3bcd7acdfd01f7ee75b1de4a7d2e96 Parents: eeca06d Author: Bennet Schulz <[email protected]> Authored: Thu Apr 27 15:36:38 2017 +0200 Committer: Justin Bertram <[email protected]> Committed: Thu May 4 10:25:06 2017 -0500 ---------------------------------------------------------------------- .../apache/activemq/artemis/cli/Artemis.java | 36 +- .../artemis/cli/commands/ActionAbstract.java | 1 - .../artemis/cli/commands/Configurable.java | 2 +- .../activemq/artemis/cli/commands/Create.java | 274 ++------ .../artemis/cli/commands/HelpAction.java | 1 - .../artemis/cli/commands/InputAbstract.java | 7 +- .../activemq/artemis/cli/commands/Mask.java | 9 +- .../artemis/cli/commands/OptionsUtil.java | 65 ++ .../activemq/artemis/cli/commands/Run.java | 10 +- .../cli/commands/address/HelpAddress.java | 2 +- .../artemis/cli/commands/messages/Browse.java | 1 - .../artemis/cli/commands/messages/Consumer.java | 1 - .../cli/commands/messages/ConsumerThread.java | 320 +++++++++ .../artemis/cli/commands/messages/Producer.java | 1 - .../cli/commands/messages/ProducerThread.java | 344 +++++++++ .../artemis/cli/commands/queue/HelpQueue.java | 2 +- .../cli/commands/tools/CompactJournal.java | 65 -- .../cli/commands/tools/DecodeJournal.java | 318 --------- .../cli/commands/tools/EncodeJournal.java | 208 ------ .../artemis/cli/commands/tools/HelpData.java | 2 +- .../cli/commands/tools/LockAbstract.java | 19 +- .../artemis/cli/commands/tools/PerfJournal.java | 92 --- .../artemis/cli/commands/tools/PrintData.java | 37 +- .../cli/commands/tools/XmlDataConstants.java | 129 ---- .../cli/commands/tools/XmlDataExporter.java | 626 ----------------- .../cli/commands/tools/XmlDataExporterUtil.java | 107 --- .../cli/commands/tools/XmlDataImporter.java | 690 ------------------- .../commands/tools/journal/CompactJournal.java | 66 ++ .../commands/tools/journal/DecodeJournal.java | 296 ++++++++ .../commands/tools/journal/EncodeJournal.java | 191 +++++ .../cli/commands/tools/journal/PerfJournal.java | 92 +++ .../commands/tools/xml/XmlDataConstants.java | 81 +++ .../cli/commands/tools/xml/XmlDataExporter.java | 627 +++++++++++++++++ .../commands/tools/xml/XmlDataExporterUtil.java | 104 +++ .../cli/commands/tools/xml/XmlDataImporter.java | 614 +++++++++++++++++ .../artemis/cli/commands/user/AddUser.java | 3 +- .../commands/user/FileBasedSecStoreConfig.java | 222 ++++++ .../artemis/cli/commands/user/HelpUser.java | 10 +- .../artemis/cli/commands/user/ListUser.java | 3 +- .../cli/commands/user/PasswordAction.java | 2 +- .../artemis/cli/commands/user/RemoveUser.java | 3 +- .../artemis/cli/commands/user/ResetUser.java | 3 +- .../artemis/cli/commands/user/UserAction.java | 23 +- .../cli/commands/util/ConsumerThread.java | 320 --------- .../cli/commands/util/ProducerThread.java | 344 --------- .../artemis/cli/factory/BrokerFactory.java | 73 ++ .../cli/factory/BrokerFactoryHandler.java | 26 + .../artemis/cli/factory/BrokerHandler.java | 26 + .../artemis/cli/factory/FileBrokerHandler.java | 30 + .../factory/security/JaasSecurityHandler.java | 31 + .../cli/factory/security/SecurityHandler.java | 25 + .../security/SecurityManagerFactory.java | 35 + .../factory/xml/XmlBrokerFactoryHandler.java | 37 + .../activemq/artemis/factory/BrokerFactory.java | 82 --- .../artemis/factory/BrokerFactoryHandler.java | 28 - .../activemq/artemis/factory/BrokerHandler.java | 26 - .../artemis/factory/FileBrokerHandler.java | 30 - .../artemis/factory/JaasSecurityHandler.java | 32 - .../artemis/factory/SecurityHandler.java | 25 - .../artemis/factory/SecurityManagerFactory.java | 37 - .../factory/XmlBrokerFactoryHandler.java | 41 -- .../artemis/integration/FileBroker.java | 4 +- .../artemis/util/FileBasedSecStoreConfig.java | 222 ------ .../activemq/artemis/util/OptionsUtil.java | 67 -- .../activemq/artemis/util/ServerUtil.java | 8 +- .../artemis/broker/security/jaas-security | 2 +- .../apache/activemq/artemis/broker/server/file | 2 +- .../org/apache/activemq/artemis/broker/xml | 2 +- .../persistence/ExportFormatTest.java | 4 +- .../persistence/XmlImportExportTest.java | 6 +- .../journal/XmlImportExportStressTest.java | 4 +- .../core/journal/impl/JournalImplTestBase.java | 4 +- 72 files changed, 3466 insertions(+), 3816 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java index 2821a3e..45e1dfd 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java @@ -17,8 +17,6 @@ package org.apache.activemq.artemis.cli; import java.io.File; -import java.io.InputStream; -import java.io.OutputStream; import java.util.List; import io.airlift.airline.Cli; @@ -37,22 +35,22 @@ import org.apache.activemq.artemis.cli.commands.address.DeleteAddress; import org.apache.activemq.artemis.cli.commands.address.HelpAddress; import org.apache.activemq.artemis.cli.commands.address.ShowAddress; import org.apache.activemq.artemis.cli.commands.address.UpdateAddress; +import org.apache.activemq.artemis.cli.commands.messages.Browse; +import org.apache.activemq.artemis.cli.commands.messages.Consumer; +import org.apache.activemq.artemis.cli.commands.messages.Producer; import org.apache.activemq.artemis.cli.commands.migration1x.Migrate1X; import org.apache.activemq.artemis.cli.commands.queue.CreateQueue; import org.apache.activemq.artemis.cli.commands.queue.DeleteQueue; import org.apache.activemq.artemis.cli.commands.queue.HelpQueue; -import org.apache.activemq.artemis.cli.commands.messages.Browse; -import org.apache.activemq.artemis.cli.commands.messages.Consumer; -import org.apache.activemq.artemis.cli.commands.messages.Producer; import org.apache.activemq.artemis.cli.commands.queue.UpdateQueue; -import org.apache.activemq.artemis.cli.commands.tools.CompactJournal; -import org.apache.activemq.artemis.cli.commands.tools.DecodeJournal; -import org.apache.activemq.artemis.cli.commands.tools.EncodeJournal; import org.apache.activemq.artemis.cli.commands.tools.HelpData; import org.apache.activemq.artemis.cli.commands.tools.PrintData; -import org.apache.activemq.artemis.cli.commands.tools.PerfJournal; -import org.apache.activemq.artemis.cli.commands.tools.XmlDataExporter; -import org.apache.activemq.artemis.cli.commands.tools.XmlDataImporter; +import org.apache.activemq.artemis.cli.commands.tools.journal.CompactJournal; +import org.apache.activemq.artemis.cli.commands.tools.journal.DecodeJournal; +import org.apache.activemq.artemis.cli.commands.tools.journal.EncodeJournal; +import org.apache.activemq.artemis.cli.commands.tools.journal.PerfJournal; +import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataExporter; +import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataImporter; import org.apache.activemq.artemis.cli.commands.user.AddUser; import org.apache.activemq.artemis.cli.commands.user.HelpUser; import org.apache.activemq.artemis.cli.commands.user.ListUser; @@ -126,7 +124,7 @@ public class Artemis { * This method is used to validate exception returns. * Useful on test cases */ - public static Object internalExecute(File artemisHome, File artemisInstance, String[] args) throws Exception { + private static Object internalExecute(File artemisHome, File artemisInstance, String[] args) throws Exception { return internalExecute(artemisHome, artemisInstance, args, ActionContext.system()); } @@ -174,18 +172,4 @@ public class Artemis { return builder; } - public static void printBanner() throws Exception { - copy(Artemis.class.getResourceAsStream("banner.txt"), System.out); - } - - private static long copy(InputStream in, OutputStream out) throws Exception { - byte[] buffer = new byte[1024]; - int len = in.read(buffer); - while (len != -1) { - out.write(buffer, 0, len); - len = in.read(buffer); - } - return len; - } - } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/ActionAbstract.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/ActionAbstract.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/ActionAbstract.java index ce90c23..2037d01 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/ActionAbstract.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/ActionAbstract.java @@ -20,7 +20,6 @@ import java.io.File; import java.net.URI; import io.airlift.airline.Option; -import org.apache.activemq.artemis.util.OptionsUtil; public abstract class ActionAbstract implements Action { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Configurable.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Configurable.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Configurable.java index 316b2c2..9046c8f 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Configurable.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Configurable.java @@ -26,10 +26,10 @@ import io.airlift.airline.Option; import io.airlift.airline.model.CommandGroupMetadata; import io.airlift.airline.model.CommandMetadata; import io.airlift.airline.model.GlobalMetadata; +import org.apache.activemq.artemis.cli.factory.BrokerFactory; import org.apache.activemq.artemis.core.config.FileDeploymentManager; import org.apache.activemq.artemis.core.config.impl.FileConfiguration; import org.apache.activemq.artemis.dto.BrokerDTO; -import org.apache.activemq.artemis.factory.BrokerFactory; import org.apache.activemq.artemis.integration.bootstrap.ActiveMQBootstrapLogger; import org.apache.activemq.artemis.jms.server.config.impl.FileJMSConfiguration; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java index 66bffe2..feb23dd 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java @@ -84,9 +84,9 @@ public class Create extends InputAbstract { public static final String ETC_ARTEMIS_ROLES_PROPERTIES = "etc/artemis-roles.properties"; public static final String ETC_ARTEMIS_USERS_PROPERTIES = "etc/artemis-users.properties"; - public static final String ETC_LOGIN_CONFIG = "etc/login.config"; - public static final String ETC_LOGIN_CONFIG_WITH_GUEST = "etc/login-with-guest.config"; - public static final String ETC_LOGIN_CONFIG_WITHOUT_GUEST = "etc/login-without-guest.config"; + private static final String ETC_LOGIN_CONFIG = "etc/login.config"; + private static final String ETC_LOGIN_CONFIG_WITH_GUEST = "etc/login-with-guest.config"; + private static final String ETC_LOGIN_CONFIG_WITHOUT_GUEST = "etc/login-without-guest.config"; public static final String ETC_REPLICATED_SETTINGS_TXT = "etc/replicated-settings.txt"; public static final String ETC_SHARED_STORE_SETTINGS_TXT = "etc/shared-store-settings.txt"; public static final String ETC_CLUSTER_SECURITY_SETTINGS_TXT = "etc/cluster-security-settings.txt"; @@ -105,213 +105,162 @@ public class Create extends InputAbstract { public static final String ETC_GLOBAL_MAX_DEFAULT_TXT = "etc/global-max-default.txt"; @Arguments(description = "The instance directory to hold the broker's configuration and data. Path must be writable.", required = true) - File directory; + private File directory; @Option(name = "--host", description = "The host name of the broker (Default: 0.0.0.0 or input if clustered)") - String host; + private String host; @Option(name = "--http-host", description = "The host name to use for embedded web server (Default: localhost)") - String httpHost = HTTP_HOST; + private String httpHost = HTTP_HOST; @Option(name = "--ping", description = "A comma separated string to be passed on to the broker config as network-check-list. The broker will shutdown when all these addresses are unreachable.") - String ping; + private String ping; @Option(name = "--default-port", description = "The port number to use for the main 'artemis' acceptor (Default: 61616)") - int defaultPort = DEFAULT_PORT; + private int defaultPort = DEFAULT_PORT; @Option(name = "--http-port", description = "The port number to use for embedded web server (Default: 8161)") - int httpPort = HTTP_PORT; + private int httpPort = HTTP_PORT; @Option(name = "--ssl-key", description = "The key store path for embedded web server") - String sslKey; + private String sslKey; @Option(name = "--ssl-key-password", description = "The key store password") - String sslKeyPassword; + private String sslKeyPassword; @Option(name = "--use-client-auth", description = "If the embedded server requires client authentication") - boolean useClientAuth; + private boolean useClientAuth; @Option(name = "--ssl-trust", description = "The trust store path in case of client authentication") - String sslTrust; + private String sslTrust; @Option(name = "--ssl-trust-password", description = "The trust store password") - String sslTrustPassword; + private String sslTrustPassword; @Option(name = "--name", description = "The name of the broker (Default: same as host)") - String name; + private String name; @Option(name = "--port-offset", description = "Off sets the ports of every acceptor") - int portOffset; + private int portOffset; @Option(name = "--force", description = "Overwrite configuration at destination directory") - boolean force; + private boolean force; @Option(name = "--home", description = "Directory where ActiveMQ Artemis is installed") - File home; + private File home; @Option(name = "--data", description = "Directory where ActiveMQ Data is used. Paths are relative to artemis.instance") - String data = "./data"; + private String data = "./data"; @Option(name = "--clustered", description = "Enable clustering") - boolean clustered = false; + private boolean clustered = false; @Option(name = "--max-hops", description = "Number of hops on the cluster configuration") - int maxHops = 0; + private int maxHops = 0; @Option(name = "--message-load-balancing", description = "Load balancing policy on cluster. [ON_DEMAND (default) | STRICT | OFF]") - MessageLoadBalancingType messageLoadBalancing = MessageLoadBalancingType.ON_DEMAND; + private MessageLoadBalancingType messageLoadBalancing = MessageLoadBalancingType.ON_DEMAND; @Option(name = "--replicated", description = "Enable broker replication") - boolean replicated = false; + private boolean replicated = false; @Option(name = "--shared-store", description = "Enable broker shared store") - boolean sharedStore = false; + private boolean sharedStore = false; @Option(name = "--slave", description = "Valid for shared store or replication: this is a slave server?") - boolean slave; + private boolean slave; @Option(name = "--failover-on-shutdown", description = "Valid for shared store: will shutdown trigger a failover? (Default: false)") - boolean failoverOnShutodwn; + private boolean failoverOnShutodwn; @Option(name = "--cluster-user", description = "The cluster user to use for clustering. (Default: input)") - String clusterUser = null; + private String clusterUser = null; @Option(name = "--cluster-password", description = "The cluster password to use for clustering. (Default: input)") - String clusterPassword = null; + private String clusterPassword = null; @Option(name = "--encoding", description = "The encoding that text files should use") - String encoding = "UTF-8"; + private String encoding = "UTF-8"; @Option(name = "--java-options", description = "Extra java options to be passed to the profile") - String javaOptions = ""; + private String javaOptions = ""; @Option(name = "--allow-anonymous", description = "Enables anonymous configuration on security, opposite of --require-login (Default: input)") - Boolean allowAnonymous = null; + private Boolean allowAnonymous = null; @Option(name = "--require-login", description = "This will configure security to require user / password, opposite of --allow-anonymous") - Boolean requireLogin = null; + private Boolean requireLogin = null; @Option(name = "--paging", description = "Page messages to disk when address becomes full, opposite of --blocking (Default: true)") - Boolean paging; + private Boolean paging; @Option(name = "--blocking", description = "Block producers when address becomes full, opposite of --paging (Default: false)") - Boolean blocking; + private Boolean blocking; @Option(name = "--no-autotune", description = "Disable auto tuning on the journal.") - boolean noAutoTune; + private boolean noAutoTune; @Option(name = "--no-autocreate", description = "Disable Auto create addresses.") - Boolean noAutoCreate; + private Boolean noAutoCreate; @Option(name = "--autocreate", description = "Auto create addresses. (default: true)") - Boolean autoCreate; + private Boolean autoCreate; @Option(name = "--user", description = "The username (Default: input)") - String user; + private String user; @Option(name = "--password", description = "The user's password (Default: input)") - String password; + private String password; @Option(name = "--role", description = "The name for the role created (Default: amq)") - String role = "amq"; + private String role = "amq"; @Option(name = "--no-web", description = "Remove the web-server definition from bootstrap.xml") - boolean noWeb; + private boolean noWeb; @Option(name = "--queues", description = "Comma separated list of queues.") - String queues; + private String queues; @Option(name = "--addresses", description = "Comma separated list of addresses ") - String addresses; + private String addresses; @Option(name = "--aio", description = "Sets the journal as asyncio.") - boolean aio; + private boolean aio; @Option(name = "--nio", description = "Sets the journal as nio.") - boolean nio; + private boolean nio; @Option(name = "--mapped", description = "Sets the journal as mapped.") - boolean mapped; + private boolean mapped; // this is used by the setupJournalType method private JournalType journalType; @Option(name = "--disable-persistence", description = "Disable message persistence to the journal") - boolean disablePersistence; + private boolean disablePersistence; @Option(name = "--no-amqp-acceptor", description = "Disable the AMQP specific acceptor.") - boolean noAmqpAcceptor; + private boolean noAmqpAcceptor; @Option(name = "--no-mqtt-acceptor", description = "Disable the MQTT specific acceptor.") - boolean noMqttAcceptor; + private boolean noMqttAcceptor; @Option(name = "--no-stomp-acceptor", description = "Disable the STOMP specific acceptor.") - boolean noStompAcceptor; + private boolean noStompAcceptor; @Option(name = "--no-hornetq-acceptor", description = "Disable the HornetQ specific acceptor.") - boolean noHornetQAcceptor; + private boolean noHornetQAcceptor; @Option(name = "--no-fsync", description = "Disable usage of fdatasync (channel.force(false) from java nio) on the journal") - boolean noJournalSync; + private boolean noJournalSync; @Option(name = "--global-max-size", description = "Maximum amount of memory which message data may consume (Default: Undefined, half of the system's memory)") - String globalMaxSize; + private String globalMaxSize; - boolean IS_WINDOWS; + private boolean IS_WINDOWS; + private boolean IS_CYGWIN; - boolean IS_CYGWIN; - - public int getMaxHops() { - return maxHops; - } - - public void setMaxHops(int maxHops) { - this.maxHops = maxHops; - } - - public boolean isNoWeb() { - return noWeb; - } - - public void setNoWeb(boolean noWeb) { - this.noWeb = noWeb; - } - - public int getPortOffset() { - return portOffset; - } - - public void setPortOffset(int portOffset) { - this.portOffset = portOffset; - } - - public MessageLoadBalancingType getMessageLoadBalancing() { - return messageLoadBalancing; - } - - public void setMessageLoadBalancing(MessageLoadBalancingType messageLoadBalancing) { - this.messageLoadBalancing = messageLoadBalancing; - } - - public Boolean getAutoCreate() { - return autoCreate; - } - - public Create setAutoCreate(Boolean autoCreate) { - this.autoCreate = autoCreate; - return this; - } - - public Boolean getNoAutoCreate() { - return noAutoCreate; - } - - public Create setNoAutoCreate(Boolean noAutoCreate) { - this.noAutoCreate = noAutoCreate; - return this; - } - - public boolean isAutoCreate() { + private boolean isAutoCreate() { if (autoCreate == null) { if (noAutoCreate != null) { autoCreate = !noAutoCreate.booleanValue(); @@ -325,14 +274,6 @@ public class Create extends InputAbstract { return autoCreate; } - public String getJavaOptions() { - return javaOptions; - } - - public void setJavaOptions(String javaOptions) { - this.javaOptions = javaOptions; - } - public File getInstance() { return directory; } @@ -348,7 +289,7 @@ public class Create extends InputAbstract { return host; } - public String getHostForClustered() { + private String getHostForClustered() { if (getHost().equals("0.0.0.0")) { host = input("--host", "Host " + host + " is not valid for clustering, please provide a valid IP or hostname", "localhost"); } @@ -379,14 +320,6 @@ public class Create extends InputAbstract { this.home = home; } - public boolean isClustered() { - return clustered; - } - - public void setClustered(boolean clustered) { - this.clustered = clustered; - } - public boolean isReplicated() { return replicated; } @@ -399,10 +332,6 @@ public class Create extends InputAbstract { return sharedStore; } - public void setSharedStore(boolean sharedStore) { - this.sharedStore = sharedStore; - } - public String getEncoding() { return encoding; } @@ -419,50 +348,42 @@ public class Create extends InputAbstract { this.data = data; } - public String getClusterUser() { + private String getClusterUser() { if (clusterUser == null) { clusterUser = input("--cluster-user", "Please provide the username:", "cluster-admin"); } return clusterUser; } - public void setClusterUser(String clusterUser) { - this.clusterUser = clusterUser; - } - - public String getClusterPassword() { + private String getClusterPassword() { if (clusterPassword == null) { clusterPassword = inputPassword("--cluster-password", "Please enter the password:", "password-admin"); } return clusterPassword; } - public String getSslKeyPassword() { + private String getSslKeyPassword() { if (sslKeyPassword == null) { sslKeyPassword = inputPassword("--ssl-key-password", "Please enter the keystore password:", "password"); } return sslKeyPassword; } - public String getSslTrust() { + private String getSslTrust() { if (sslTrust == null) { sslTrust = input("--ssl-trust", "Please enter the trust store path:", "/etc/truststore.jks"); } return sslTrust; } - public String getSslTrustPassword() { + private String getSslTrustPassword() { if (sslTrustPassword == null) { sslTrustPassword = inputPassword("--ssl-key-password", "Please enter the keystore password:", "password"); } return sslTrustPassword; } - public void setClusterPassword(String clusterPassword) { - this.clusterPassword = clusterPassword; - } - - public boolean isAllowAnonymous() { + private boolean isAllowAnonymous() { if (allowAnonymous == null) { allowAnonymous = inputBoolean("--allow-anonymous | --require-login", "Allow anonymous access?", true); } @@ -473,21 +394,6 @@ public class Create extends InputAbstract { return paging; } - public void setAllowAnonymous(boolean allowAnonymous) { - this.allowAnonymous = Boolean.valueOf(allowAnonymous); - } - - public Boolean getRequireLogin() { - if (requireLogin == null) { - requireLogin = !isAllowAnonymous(); - } - return requireLogin; - } - - public void setRequireLogin(Boolean requireLogin) { - this.requireLogin = requireLogin; - } - public String getPassword() { if (password == null) { @@ -522,45 +428,18 @@ public class Create extends InputAbstract { this.role = role; } - public String getGlobalMaxSize() { - return globalMaxSize; - } - public void setGlobalMaxSize(String globalMaxSize) { - this.globalMaxSize = globalMaxSize; - } - - public boolean isSlave() { + private boolean isSlave() { return slave; } - public void setSlave(boolean slave) { - this.slave = slave; - } - - public boolean isFailoverOnShutodwn() { + private boolean isFailoverOnShutodwn() { return failoverOnShutodwn; } - public void setFailoverOnShutodwn(boolean failoverOnShutodwn) { - this.failoverOnShutodwn = failoverOnShutodwn; - } - - public Boolean getAllowAnonymous() { - return allowAnonymous; - } - - public void setAllowAnonymous(Boolean allowAnonymous) { - this.allowAnonymous = allowAnonymous; - } - - public boolean isDisablePersistence() { + private boolean isDisablePersistence() { return disablePersistence; } - public void setDisablePersistence(boolean disablePersistence) { - this.disablePersistence = disablePersistence; - } - @Override public Object execute(ActionContext context) throws Exception { this.checkDirectory(); @@ -589,6 +468,7 @@ public class Create extends InputAbstract { throw new RuntimeException(String.format("The path '%s' is not writable.", directory)); } } + public Object run(ActionContext context) throws Exception { IS_WINDOWS = System.getProperty("os.name").toLowerCase().trim().startsWith("win"); @@ -860,7 +740,6 @@ public class Create extends InputAbstract { } } - private static int countBoolean(boolean...b) { int count = 0; @@ -890,23 +769,6 @@ public class Create extends InputAbstract { } /** - * It will create the jms configurations - */ - private void applyJMSObjects(HashMap<String, String> filters) { - StringWriter writer = new StringWriter(); - PrintWriter printWriter = new PrintWriter(writer); - printWriter.println(); - - for (String str : getQueueList()) { - printWriter.println(" <queue name=\"" + str + "\"/>"); - } - for (String str : getAddressList()) { - printWriter.println(" <topic name=\"" + str + "\"/>"); - } - filters.put("${jms-list.settings}", writer.toString()); - } - - /** * It will create the address and queue configurations */ private void applyAddressesAndQueues(HashMap<String, String> filters) { @@ -1004,7 +866,7 @@ public class Create extends InputAbstract { } } - String path(String value, boolean unixPaths) throws IOException { + private String path(String value, boolean unixPaths) throws IOException { return path(new File(value), unixPaths); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/HelpAction.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/HelpAction.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/HelpAction.java index a154494..ebbacdd 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/HelpAction.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/HelpAction.java @@ -19,7 +19,6 @@ package org.apache.activemq.artemis.cli.commands; import java.io.File; import io.airlift.airline.Help; -import org.apache.activemq.artemis.util.OptionsUtil; public class HelpAction extends Help implements Action { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/InputAbstract.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/InputAbstract.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/InputAbstract.java index 4d1fe35..d4535be 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/InputAbstract.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/InputAbstract.java @@ -37,15 +37,10 @@ public class InputAbstract extends ActionAbstract { @Option(name = "--silent", description = "It will disable all the inputs, and it would make a best guess for any required input") private boolean silentInput = false; - public boolean isSilentInput() { + private boolean isSilentInput() { return silentInput || !inputEnabled; } - public void setSilentInput(boolean silentInput) { - this.silentInput = silentInput; - } - - protected boolean inputBoolean(String propertyName, String prompt, boolean silentDefault) { if (isSilentInput()) { return silentDefault; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Mask.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Mask.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Mask.java index d90536e..e8a778f 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Mask.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Mask.java @@ -16,17 +16,16 @@ */ package org.apache.activemq.artemis.cli.commands; +import java.io.File; +import java.util.HashMap; +import java.util.Map; + import io.airlift.airline.Arguments; import io.airlift.airline.Command; import io.airlift.airline.Option; -import org.apache.activemq.artemis.util.OptionsUtil; import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec; import org.apache.activemq.artemis.utils.PasswordMaskingUtil; -import java.io.File; -import java.util.HashMap; -import java.util.Map; - @Command(name = "mask", description = "mask a password and print it out") public class Mask implements Action { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/OptionsUtil.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/OptionsUtil.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/OptionsUtil.java new file mode 100644 index 0000000..926236a --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/OptionsUtil.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.cli.commands; + +import java.lang.reflect.Field; +import java.util.HashSet; +import java.util.Set; + +import io.airlift.airline.Option; + +public class OptionsUtil { + + private static void findAllOptions(Set<String> options, Class<? extends Action> command) { + for (Field field : command.getDeclaredFields()) { + if (field.isAnnotationPresent(Option.class)) { + Option annotation = field.getAnnotation(Option.class); + String[] names = annotation.name(); + for (String n : names) { + options.add(n); + } + } + } + Class parent = command.getSuperclass(); + if (Action.class.isAssignableFrom(parent)) { + findAllOptions(options, parent); + } + } + + private static Set<String> findCommandOptions(Class<? extends Action> command) { + Set<String> options = new HashSet<>(); + findAllOptions(options, command); + + return options; + } + + public static void checkCommandOptions(Class<? extends Action> cmdClass, String[] options) throws InvalidOptionsError { + Set<String> definedOptions = OptionsUtil.findCommandOptions(cmdClass); + for (String opt : options) { + if (opt.startsWith("--") && !"--".equals(opt.trim())) { + int index = opt.indexOf("="); + if (index > 0) { + opt = opt.substring(0, index); + } + if (!definedOptions.contains(opt)) { + throw new InvalidOptionsError("Found unexpected parameters: [" + opt + "]"); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Run.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Run.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Run.java index 02478a1..a299b18 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Run.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Run.java @@ -22,13 +22,13 @@ import java.util.TimerTask; import io.airlift.airline.Command; import io.airlift.airline.Option; -import org.apache.activemq.artemis.cli.Artemis; import org.apache.activemq.artemis.cli.commands.tools.LockAbstract; +import org.apache.activemq.artemis.cli.commands.tools.PrintData; +import org.apache.activemq.artemis.cli.factory.BrokerFactory; +import org.apache.activemq.artemis.cli.factory.security.SecurityManagerFactory; import org.apache.activemq.artemis.components.ExternalComponent; import org.apache.activemq.artemis.dto.BrokerDTO; import org.apache.activemq.artemis.dto.ComponentDTO; -import org.apache.activemq.artemis.factory.BrokerFactory; -import org.apache.activemq.artemis.factory.SecurityManagerFactory; import org.apache.activemq.artemis.integration.Broker; import org.apache.activemq.artemis.integration.bootstrap.ActiveMQBootstrapLogger; import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager; @@ -40,7 +40,7 @@ public class Run extends LockAbstract { @Option(name = "--allow-kill", description = "This will allow the server to kill itself. Useful for tests (failover tests for instance)") boolean allowKill; - static boolean embedded = false; + private static boolean embedded = false; public static final ReusableLatch latchRunning = new ReusableLatch(0); @@ -60,7 +60,7 @@ public class Run extends LockAbstract { public Object execute(ActionContext context) throws Exception { super.execute(context); - Artemis.printBanner(); + PrintData.printBanner(); BrokerDTO broker = getBrokerDTO(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/HelpAddress.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/HelpAddress.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/HelpAddress.java index c89621e..0dcd20f 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/HelpAddress.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/HelpAddress.java @@ -25,7 +25,7 @@ import io.airlift.airline.Help; import org.apache.activemq.artemis.cli.commands.Action; import org.apache.activemq.artemis.cli.commands.ActionContext; import org.apache.activemq.artemis.cli.commands.InvalidOptionsError; -import org.apache.activemq.artemis.util.OptionsUtil; +import org.apache.activemq.artemis.cli.commands.OptionsUtil; public class HelpAddress extends Help implements Action { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Browse.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Browse.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Browse.java index 66336b3..5f03c82 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Browse.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Browse.java @@ -24,7 +24,6 @@ import javax.jms.Session; import io.airlift.airline.Command; import io.airlift.airline.Option; import org.apache.activemq.artemis.cli.commands.ActionContext; -import org.apache.activemq.artemis.cli.commands.util.ConsumerThread; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java index 1af9fac..5a2a8b6 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java @@ -24,7 +24,6 @@ import javax.jms.Session; import io.airlift.airline.Command; import io.airlift.airline.Option; import org.apache.activemq.artemis.cli.commands.ActionContext; -import org.apache.activemq.artemis.cli.commands.util.ConsumerThread; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java new file mode 100644 index 0000000..b69e618 --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.cli.commands.messages; + +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import java.util.Enumeration; +import java.util.concurrent.CountDownLatch; + +public class ConsumerThread extends Thread { + + int messageCount = 1000; + int receiveTimeOut = 3000; + Destination destination; + Session session; + boolean durable; + boolean breakOnNull = true; + int sleep; + int batchSize; + boolean verbose; + boolean browse; + + String filter; + + int received = 0; + int transactions = 0; + boolean running = false; + CountDownLatch finished; + boolean bytesAsText; + + public ConsumerThread(Session session, Destination destination, int threadNr) { + super("Consumer " + destination.toString() + ", thread=" + threadNr); + this.destination = destination; + this.session = session; + } + + @Override + public void run() { + if (browse) { + browse(); + } else { + consume(); + } + } + + public void browse() { + running = true; + QueueBrowser consumer = null; + String threadName = Thread.currentThread().getName(); + System.out.println(threadName + " wait until " + messageCount + " messages are consumed"); + try { + if (filter != null) { + consumer = session.createBrowser((Queue) destination, filter); + } else { + consumer = session.createBrowser((Queue) destination); + } + Enumeration<Message> enumBrowse = consumer.getEnumeration(); + + while (enumBrowse.hasMoreElements()) { + Message msg = enumBrowse.nextElement(); + if (msg != null) { + System.out.println(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID())); + + if (verbose) { + System.out.println("..." + msg); + } + if (bytesAsText && (msg instanceof BytesMessage)) { + long length = ((BytesMessage) msg).getBodyLength(); + byte[] bytes = new byte[(int) length]; + ((BytesMessage) msg).readBytes(bytes); + System.out.println("Message:" + msg); + } + received++; + + if (received >= messageCount) { + break; + } + } else { + break; + } + + if (sleep > 0) { + Thread.sleep(sleep); + } + + } + + consumer.close(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (finished != null) { + finished.countDown(); + } + if (consumer != null) { + System.out.println(threadName + " Consumed: " + this.getReceived() + " messages"); + try { + consumer.close(); + } catch (JMSException e) { + e.printStackTrace(); + } + } + } + + System.out.println(threadName + " Consumer thread finished"); + } + + public void consume() { + running = true; + MessageConsumer consumer = null; + String threadName = Thread.currentThread().getName(); + System.out.println(threadName + " wait until " + messageCount + " messages are consumed"); + try { + if (durable && destination instanceof Topic) { + if (filter != null) { + consumer = session.createDurableSubscriber((Topic) destination, getName(), filter, false); + } else { + consumer = session.createDurableSubscriber((Topic) destination, getName()); + } + } else { + if (filter != null) { + consumer = session.createConsumer(destination, filter); + } else { + consumer = session.createConsumer(destination); + } + } + while (running && received < messageCount) { + Message msg = consumer.receive(receiveTimeOut); + if (msg != null) { + System.out.println(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID())); + if (verbose) { + System.out.println("..." + msg); + } + if (bytesAsText && (msg instanceof BytesMessage)) { + long length = ((BytesMessage) msg).getBodyLength(); + byte[] bytes = new byte[(int) length]; + ((BytesMessage) msg).readBytes(bytes); + System.out.println("Message:" + msg); + } + received++; + } else { + if (breakOnNull) { + break; + } + } + + if (session.getTransacted()) { + if (batchSize > 0 && received > 0 && received % batchSize == 0) { + System.out.println(threadName + " Committing transaction: " + transactions++); + session.commit(); + } + } else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { + if (batchSize > 0 && received > 0 && received % batchSize == 0) { + System.out.println("Acknowledging last " + batchSize + " messages; messages so far = " + received); + msg.acknowledge(); + } + } + if (sleep > 0) { + Thread.sleep(sleep); + } + + } + + try { + session.commit(); + } catch (Throwable ignored) { + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (finished != null) { + finished.countDown(); + } + if (consumer != null) { + System.out.println(threadName + " Consumed: " + this.getReceived() + " messages"); + try { + consumer.close(); + } catch (JMSException e) { + e.printStackTrace(); + } + } + } + + System.out.println(threadName + " Consumer thread finished"); + } + + public int getReceived() { + return received; + } + + public boolean isDurable() { + return durable; + } + + public ConsumerThread setDurable(boolean durable) { + this.durable = durable; + return this; + } + + public ConsumerThread setMessageCount(int messageCount) { + this.messageCount = messageCount; + return this; + } + + public ConsumerThread setBreakOnNull(boolean breakOnNull) { + this.breakOnNull = breakOnNull; + return this; + } + + public int getBatchSize() { + return batchSize; + } + + public ConsumerThread setBatchSize(int batchSize) { + this.batchSize = batchSize; + return this; + } + + public int getMessageCount() { + return messageCount; + } + + public boolean isBreakOnNull() { + return breakOnNull; + } + + public int getReceiveTimeOut() { + return receiveTimeOut; + } + + public ConsumerThread setReceiveTimeOut(int receiveTimeOut) { + this.receiveTimeOut = receiveTimeOut; + return this; + } + + public boolean isRunning() { + return running; + } + + public ConsumerThread setRunning(boolean running) { + this.running = running; + return this; + } + + public int getSleep() { + return sleep; + } + + public ConsumerThread setSleep(int sleep) { + this.sleep = sleep; + return this; + } + + public CountDownLatch getFinished() { + return finished; + } + + public ConsumerThread setFinished(CountDownLatch finished) { + this.finished = finished; + return this; + } + + public boolean isBytesAsText() { + return bytesAsText; + } + + public boolean isVerbose() { + return verbose; + } + + public ConsumerThread setVerbose(boolean verbose) { + this.verbose = verbose; + return this; + } + + public ConsumerThread setBytesAsText(boolean bytesAsText) { + this.bytesAsText = bytesAsText; + return this; + } + + public String getFilter() { + return filter; + } + + public ConsumerThread setFilter(String filter) { + this.filter = filter; + return this; + } + + public boolean isBrowse() { + return browse; + } + + public ConsumerThread setBrowse(boolean browse) { + this.browse = browse; + return this; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java index 4ec68ca..f7c11ae 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java @@ -24,7 +24,6 @@ import javax.jms.Session; import io.airlift.airline.Command; import io.airlift.airline.Option; import org.apache.activemq.artemis.cli.commands.ActionContext; -import org.apache.activemq.artemis.cli.commands.util.ProducerThread; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ProducerThread.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ProducerThread.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ProducerThread.java new file mode 100644 index 0000000..9a4c1a7 --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ProducerThread.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.cli.commands.messages; + +import javax.jms.BytesMessage; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URL; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.utils.ReusableLatch; + +public class ProducerThread extends Thread { + + protected final Session session; + + boolean verbose; + int messageCount = 1000; + boolean runIndefinitely = false; + Destination destination; + int sleep = 0; + boolean persistent = true; + int messageSize = 0; + int textMessageSize; + long msgTTL = 0L; + String msgGroupID = null; + int transactionBatchSize; + + int transactions = 0; + final AtomicInteger sentCount = new AtomicInteger(0); + String message; + String messageText = null; + String payloadUrl = null; + byte[] payload = null; + boolean running = false; + final ReusableLatch finished = new ReusableLatch(1); + final ReusableLatch paused = new ReusableLatch(0); + + public ProducerThread(Session session, Destination destination, int threadNr) { + super("Producer " + destination.toString() + ", thread=" + threadNr); + this.destination = destination; + this.session = session; + } + + @Override + public void run() { + MessageProducer producer = null; + String threadName = Thread.currentThread().getName(); + try { + producer = session.createProducer(destination); + producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + producer.setTimeToLive(msgTTL); + initPayLoad(); + running = true; + + System.out.println(threadName + " Started to calculate elapsed time ...\n"); + long tStart = System.currentTimeMillis(); + + if (runIndefinitely) { + while (running) { + paused.await(); + sendMessage(producer, threadName); + sentCount.incrementAndGet(); + } + } else { + for (sentCount.set(0); sentCount.get() < messageCount && running; sentCount.incrementAndGet()) { + paused.await(); + sendMessage(producer, threadName); + } + } + + try { + session.commit(); + } catch (Throwable ignored) { + } + + System.out.println(threadName + " Produced: " + this.getSentCount() + " messages"); + long tEnd = System.currentTimeMillis(); + long elapsed = (tEnd - tStart) / 1000; + System.out.println(threadName + " Elapsed time in second : " + elapsed + " s"); + System.out.println(threadName + " Elapsed time in milli second : " + (tEnd - tStart) + " milli seconds"); + + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (finished != null) { + finished.countDown(); + } + if (producer != null) { + try { + producer.close(); + } catch (JMSException e) { + e.printStackTrace(); + } + } + } + } + + private void sendMessage(MessageProducer producer, String threadName) throws Exception { + Message message = createMessage(sentCount.get(), threadName); + producer.send(message); + if (verbose) { + System.out.println(threadName + " Sent: " + (message instanceof TextMessage ? ((TextMessage) message).getText() : message.getJMSMessageID())); + } + + if (transactionBatchSize > 0 && sentCount.get() > 0 && sentCount.get() % transactionBatchSize == 0) { + System.out.println(threadName + " Committing transaction: " + transactions++); + session.commit(); + } + + if (sleep > 0) { + Thread.sleep(sleep); + } + } + + private void initPayLoad() { + if (messageSize > 0) { + payload = new byte[messageSize]; + for (int i = 0; i < payload.length; i++) { + payload[i] = '.'; + } + } + } + + protected Message createMessage(int i, String threadName) throws Exception { + Message answer; + if (payload != null) { + answer = session.createBytesMessage(); + ((BytesMessage) answer).writeBytes(payload); + } else { + if (textMessageSize > 0) { + if (messageText == null) { + messageText = readInputStream(getClass().getResourceAsStream("demo.txt"), textMessageSize, i); + } + } else if (payloadUrl != null) { + messageText = readInputStream(new URL(payloadUrl).openStream(), -1, i); + } else if (message != null) { + messageText = message; + } else { + messageText = createDefaultMessage(i); + } + answer = session.createTextMessage(messageText); + } + if ((msgGroupID != null) && (!msgGroupID.isEmpty())) { + answer.setStringProperty("JMSXGroupID", msgGroupID); + } + + answer.setIntProperty("count", i); + answer.setStringProperty("ThreadSent", threadName); + return answer; + } + + private String readInputStream(InputStream is, int size, int messageNumber) throws IOException { + try (InputStreamReader reader = new InputStreamReader(is)) { + char[] buffer; + if (size > 0) { + buffer = new char[size]; + } else { + buffer = new char[1024]; + } + int count; + StringBuilder builder = new StringBuilder(); + while ((count = reader.read(buffer)) != -1) { + builder.append(buffer, 0, count); + if (size > 0) + break; + } + return builder.toString(); + } catch (IOException ioe) { + return createDefaultMessage(messageNumber); + } + } + + private String createDefaultMessage(int messageNumber) { + return "test message: " + messageNumber; + } + + public ProducerThread setMessageCount(int messageCount) { + this.messageCount = messageCount; + return this; + } + + public int getSleep() { + return sleep; + } + + public ProducerThread setSleep(int sleep) { + this.sleep = sleep; + return this; + } + + public int getMessageCount() { + return messageCount; + } + + public int getSentCount() { + return sentCount.get(); + } + + public boolean isPersistent() { + return persistent; + } + + public ProducerThread setPersistent(boolean persistent) { + this.persistent = persistent; + return this; + } + + public boolean isRunning() { + return running; + } + + public ProducerThread setRunning(boolean running) { + this.running = running; + return this; + } + + public long getMsgTTL() { + return msgTTL; + } + + public ProducerThread setMsgTTL(long msgTTL) { + this.msgTTL = msgTTL; + return this; + } + + public int getTransactionBatchSize() { + return transactionBatchSize; + } + + public ProducerThread setTransactionBatchSize(int transactionBatchSize) { + this.transactionBatchSize = transactionBatchSize; + return this; + } + + public String getMsgGroupID() { + return msgGroupID; + } + + public ProducerThread setMsgGroupID(String msgGroupID) { + this.msgGroupID = msgGroupID; + return this; + } + + public int getTextMessageSize() { + return textMessageSize; + } + + public ProducerThread setTextMessageSize(int textMessageSize) { + this.textMessageSize = textMessageSize; + return this; + } + + public int getMessageSize() { + return messageSize; + } + + public ProducerThread setMessageSize(int messageSize) { + this.messageSize = messageSize; + return this; + } + + public ReusableLatch getFinished() { + return finished; + } + + public ProducerThread setFinished(int value) { + finished.setCount(value); + return this; + } + + public String getPayloadUrl() { + return payloadUrl; + } + + public ProducerThread setPayloadUrl(String payloadUrl) { + this.payloadUrl = payloadUrl; + return this; + } + + public String getMessage() { + return message; + } + + public ProducerThread setMessage(String message) { + this.message = message; + return this; + } + + public boolean isRunIndefinitely() { + return runIndefinitely; + } + + public ProducerThread setRunIndefinitely(boolean runIndefinitely) { + this.runIndefinitely = runIndefinitely; + return this; + } + + public ProducerThread pauseProducer() { + this.paused.countUp(); + return this; + } + + public ProducerThread resumeProducer() { + this.paused.countDown(); + return this; + } + + public ProducerThread resetCounters() { + this.sentCount.set(0); + return this; + } + + public boolean isVerbose() { + return verbose; + } + + public ProducerThread setVerbose(boolean verbose) { + this.verbose = verbose; + return this; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/HelpQueue.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/HelpQueue.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/HelpQueue.java index f106a18..dc2bd45 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/HelpQueue.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/HelpQueue.java @@ -25,7 +25,7 @@ import io.airlift.airline.Help; import org.apache.activemq.artemis.cli.commands.Action; import org.apache.activemq.artemis.cli.commands.ActionContext; import org.apache.activemq.artemis.cli.commands.InvalidOptionsError; -import org.apache.activemq.artemis.util.OptionsUtil; +import org.apache.activemq.artemis.cli.commands.OptionsUtil; public class HelpQueue extends Help implements Action { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/CompactJournal.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/CompactJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/CompactJournal.java deleted file mode 100644 index 2959828..0000000 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/CompactJournal.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.cli.commands.tools; - -import java.io.File; - -import io.airlift.airline.Command; -import org.apache.activemq.artemis.cli.commands.ActionContext; -import org.apache.activemq.artemis.core.config.Configuration; -import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; -import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; -import org.apache.activemq.artemis.core.journal.impl.JournalImpl; - -@Command(name = "compact", description = "Compacts the journal of a non running server") -public final class CompactJournal extends LockAbstract { - - @Override - public Object execute(ActionContext context) throws Exception { - super.execute(context); - try { - Configuration configuration = getFileConfiguration(); - compactJournal(new File(getJournal()), "activemq-data", "amq", configuration.getJournalMinFiles(), configuration.getJournalFileSize(), null); - System.out.println("Compactation succeeded for " + getJournal()); - compactJournal(new File(getBinding()), "activemq-bindings", "bindings", 2, 1048576, null); - System.out.println("Compactation succeeded for " + getBinding()); - - } catch (Exception e) { - treatError(e, "data", "compact"); - } - return null; - } - - void compactJournal(final File directory, - final String journalPrefix, - final String journalSuffix, - final int minFiles, - final int fileSize, - final IOCriticalErrorListener listener) throws Exception { - NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener, 1); - - JournalImpl journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1); - - journal.start(); - - journal.loadInternalOnly(); - - journal.compact(); - - journal.stop(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f82623a2/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java deleted file mode 100644 index f290eba..0000000 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java +++ /dev/null @@ -1,318 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.cli.commands.tools; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; - -import io.airlift.airline.Command; -import io.airlift.airline.Option; -import org.apache.activemq.artemis.cli.commands.ActionContext; -import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; -import org.apache.activemq.artemis.core.journal.RecordInfo; -import org.apache.activemq.artemis.core.journal.impl.JournalImpl; -import org.apache.activemq.artemis.utils.Base64; - -@Command(name = "decode", description = "Decode a journal's internal format into a new journal set of files") -public class DecodeJournal extends LockAbstract { - - @Option(name = "--directory", description = "The journal folder (default journal folder from broker.xml)") - public String directory; - - @Option(name = "--prefix", description = "The journal prefix (default activemq-data)") - public String prefix = "activemq-data"; - - @Option(name = "--suffix", description = "The journal suffix (default amq)") - public String suffix = "amq"; - - @Option(name = "--file-size", description = "The journal size (default 10485760)") - public int size = 10485760; - - @Option(name = "--input", description = "The input file name (default=exp.dmp)", required = true) - public String input = "exp.dmp"; - - @Override - public Object execute(ActionContext context) throws Exception { - super.execute(context); - try { - if (directory == null) { - directory = getFileConfiguration().getJournalDirectory(); - } - importJournal(directory, prefix, suffix, 2, size, input); - } catch (Exception e) { - treatError(e, "data", "decode"); - } - - return null; - } - - public static void importJournal(final String directory, - final String journalPrefix, - final String journalSuffix, - final int minFiles, - final int fileSize, - final String fileInput) throws Exception { - FileInputStream fileInputStream = new FileInputStream(new File(fileInput)); - importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, fileInputStream); - - } - - public static void importJournal(final String directory, - final String journalPrefix, - final String journalSuffix, - final int minFiles, - final int fileSize, - final InputStream stream) throws Exception { - Reader reader = new InputStreamReader(stream); - importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, reader); - } - - public static void importJournal(final String directory, - final String journalPrefix, - final String journalSuffix, - final int minFiles, - final int fileSize, - final Reader reader) throws Exception { - - File journalDir = new File(directory); - - if (!journalDir.exists()) { - if (!journalDir.mkdirs()) - System.err.println("Could not create directory " + directory); - } - - NIOSequentialFileFactory nio = new NIOSequentialFileFactory(new File(directory), null, 1); - - JournalImpl journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1); - - if (journal.orderFiles().size() != 0) { - throw new IllegalStateException("Import needs to create a brand new journal"); - } - - journal.start(); - - // The journal is empty, as we checked already. Calling load just to initialize the internal data - journal.loadInternalOnly(); - - BufferedReader buffReader = new BufferedReader(reader); - - String line; - - HashMap<Long, AtomicInteger> txCounters = new HashMap<>(); - - long lineNumber = 0; - - while ((line = buffReader.readLine()) != null) { - lineNumber++; - String[] splitLine = line.split(","); - if (splitLine[0].equals("#File")) { - txCounters.clear(); - continue; - } - - Properties lineProperties = parseLine(splitLine); - - String operation = null; - try { - operation = lineProperties.getProperty("operation"); - - if (operation.equals("AddRecord")) { - RecordInfo info = parseRecord(lineProperties); - journal.appendAddRecord(info.id, info.userRecordType, info.data, false); - } else if (operation.equals("AddRecordTX")) { - long txID = parseLong("txID", lineProperties); - AtomicInteger counter = getCounter(txID, txCounters); - counter.incrementAndGet(); - RecordInfo info = parseRecord(lineProperties); - journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data); - } else if (operation.equals("UpdateTX")) { - long txID = parseLong("txID", lineProperties); - AtomicInteger counter = getCounter(txID, txCounters); - counter.incrementAndGet(); - RecordInfo info = parseRecord(lineProperties); - journal.appendUpdateRecordTransactional(txID, info.id, info.userRecordType, info.data); - } else if (operation.equals("Update")) { - RecordInfo info = parseRecord(lineProperties); - journal.appendUpdateRecord(info.id, info.userRecordType, info.data, false); - } else if (operation.equals("DeleteRecord")) { - long id = parseLong("id", lineProperties); - - try { - journal.appendDeleteRecord(id, false); - } catch (IllegalStateException ignored) { - // If not found it means the append/update records were reclaimed already - } - } else if (operation.equals("DeleteRecordTX")) { - long txID = parseLong("txID", lineProperties); - long id = parseLong("id", lineProperties); - AtomicInteger counter = getCounter(txID, txCounters); - counter.incrementAndGet(); - journal.appendDeleteRecordTransactional(txID, id); - } else if (operation.equals("Prepare")) { - long txID = parseLong("txID", lineProperties); - int numberOfRecords = parseInt("numberOfRecords", lineProperties); - AtomicInteger counter = getCounter(txID, txCounters); - byte[] data = parseEncoding("extraData", lineProperties); - - if (counter.get() == numberOfRecords) { - journal.appendPrepareRecord(txID, data, false); - } else { - System.err.println("Transaction " + txID + - " at line " + - lineNumber + - " is incomplete. The prepare record expected " + - numberOfRecords + - " while the import only had " + - counter); - } - } else if (operation.equals("Commit")) { - long txID = parseLong("txID", lineProperties); - int numberOfRecords = parseInt("numberOfRecords", lineProperties); - AtomicInteger counter = getCounter(txID, txCounters); - if (counter.get() == numberOfRecords) { - journal.appendCommitRecord(txID, false); - } else { - System.err.println("Transaction " + txID + - " at line " + - lineNumber + - " is incomplete. The commit record expected " + - numberOfRecords + - " while the import only had " + - counter); - } - } else if (operation.equals("Rollback")) { - long txID = parseLong("txID", lineProperties); - journal.appendRollbackRecord(txID, false); - } else { - System.err.println("Invalid operation " + operation + " at line " + lineNumber); - } - } catch (Exception ex) { - System.err.println("Error at line " + lineNumber + ", operation=" + operation + " msg = " + ex.getMessage()); - } - } - - journal.stop(); - } - - protected static AtomicInteger getCounter(final Long txID, final Map<Long, AtomicInteger> txCounters) { - - AtomicInteger counter = txCounters.get(txID); - if (counter == null) { - counter = new AtomicInteger(0); - txCounters.put(txID, counter); - } - - return counter; - } - - protected static RecordInfo parseRecord(final Properties properties) throws Exception { - long id = parseLong("id", properties); - byte userRecordType = parseByte("userRecordType", properties); - boolean isUpdate = parseBoolean("isUpdate", properties); - byte[] data = parseEncoding("data", properties); - return new RecordInfo(id, userRecordType, data, isUpdate, (short) 0); - } - - private static byte[] parseEncoding(final String name, final Properties properties) throws Exception { - String value = parseString(name, properties); - - return decode(value); - } - - /** - * @param properties - * @return - */ - private static int parseInt(final String name, final Properties properties) throws Exception { - String value = parseString(name, properties); - - return Integer.parseInt(value); - } - - private static long parseLong(final String name, final Properties properties) throws Exception { - String value = parseString(name, properties); - - return Long.parseLong(value); - } - - private static boolean parseBoolean(final String name, final Properties properties) throws Exception { - String value = parseString(name, properties); - - return Boolean.parseBoolean(value); - } - - private static byte parseByte(final String name, final Properties properties) throws Exception { - String value = parseString(name, properties); - - return Byte.parseByte(value); - } - - /** - * @param name - * @param properties - * @return - * @throws Exception - */ - private static String parseString(final String name, final Properties properties) throws Exception { - String value = properties.getProperty(name); - - if (value == null) { - throw new Exception("property " + name + " not found"); - } - return value; - } - - protected static Properties parseLine(final String[] splitLine) { - Properties properties = new Properties(); - - for (String el : splitLine) { - String[] tuple = el.split("@"); - if (tuple.length == 2) { - properties.put(tuple[0], tuple[1]); - } else { - properties.put(tuple[0], tuple[0]); - } - } - - return properties; - } - - private static byte[] decode(final String data) { - return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE); - } - - public void printUsage() { - for (int i = 0; i < 10; i++) { - System.err.println(); - } - System.err.println("This method will export the journal at low level record."); - System.err.println(); - System.err.println(); - for (int i = 0; i < 10; i++) { - System.err.println(); - } - } - -}
