Repository: karaf Updated Branches: refs/heads/OPENSSH 35b7c123f -> 3b7aef862 (forced update)
http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/java/org/apache/karaf/jms/JmsService.java ---------------------------------------------------------------------- diff --git a/jms/src/main/java/org/apache/karaf/jms/JmsService.java b/jms/src/main/java/org/apache/karaf/jms/JmsService.java new file mode 100644 index 0000000..0902e33 --- /dev/null +++ b/jms/src/main/java/org/apache/karaf/jms/JmsService.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms; + +import java.util.List; +import java.util.Map; + +/** + * JMS Service. + */ +public interface JmsService { + + /** + * List the JMS connection factories. + * + * @return The {@link List} of JMS connection factory names. + * @throws Exception If the service fails. + */ + List<String> connectionFactories() throws Exception; + + /** + * List the JMS connection factories file names. + * + * @return The {@link List} of JMS connection factory file names. + * @throws Exception If the service fails. + */ + List<String> connectionFactoryFileNames() throws Exception; + + /** + * Create a new JMS connection factory. + * + * @param name The JMS connection factory name. + * @param type The JMS connection factory type (ActiveMQ, WebsphereMQ, ...). + * @param url The JMS URL to use. + * @throws Exception If the service fails. + */ + void create(String name, String type, String url) throws Exception; + + /** + * Create a new JMS connection factory. + * + * @param name The JMS connection factory name. + * @param type The JMS connection factory type (ActiveMQ, WebsphereMQ, ...). + * @param url The JMS URL to use. + * @param username The username to use. + * @param password The password to use. + * @throws Exception If the service fails. + */ + void create(String name, String type, String url, String username, String password) throws Exception; + + /** + * Delete a JMS connection factory. + * + * @param name The JMS connection factory name. + * @throws Exception If the service fails. + */ + void delete(String name) throws Exception; + + /** + * Get details about a given JMS connection factory. + * + * @param connectionFactory The JMS connection factory name. + * @param username The (optional) username to connect to the JMS broker. + * @param password The (optional) password to connect to the JMS broker. + * @return A {@link Map} (property/value) containing details. + * @throws Exception If the service fails. + */ + Map<String, String> info(String connectionFactory, String username, String password) throws Exception; + + /** + * Count the number of messages in a JMS queue. + * + * @param connectionFactory The JMS connection factory name. + * @param queue The queue name. + * @param username The (optional) username to connect to the JMS broker. + * @param password The (optional) password to connect to the JMS broker. + * @return The number of messages in a JMS queue. + * @throws Exception If the service fails. + */ + int count(String connectionFactory, String queue, String username, String password) throws Exception; + + /** + * List the queues. + * + * @param connectionFactory The JMS connection factory name. + * @param username The (optional) username to connect to the JMS broker. + * @param password The (optional) password to connect to the JMS broker. + * @return The {@link List} of queues. + * @throws Exception If the service fails. + */ + List<String> queues(String connectionFactory, String username, String password) throws Exception; + + /** + * List the topics. + * + * @param connectionFactory The JMS connection factory name. + * @param username The (optional) username to connect to the JMS broker. + * @param password The (optional) password to connect to the JMS broker. + * @return The {@link List} of topics. + * @throws Exception If the service fails. + */ + List<String> topics(String connectionFactory, String username, String password) throws Exception; + + /** + * Browse a destination. + * + * @param connectionFactory The JMS connection factory name. + * @param queue The queue name. + * @param selector The selector. + * @param username The (optional) username to connect to the JMS broker. + * @param password The (optional) password to connect to the JMS broker. + * @return The {@link List} of messages. + * @throws Exception If the service fails. + */ + List<JmsMessage> browse(String connectionFactory, String queue, String selector, String username, String password) throws Exception; + + /** + * Send a message on the given queue. + * + * @param connectionFactory The JMS connection factory name. + * @param queue The queue name. + * @param body The message body. + * @param replyTo The message replyTo header. + * @param username The (optional) username to connect to the JMS broker. + * @param password The (optional) password to connect to the JMS broker. + * @throws Exception If the service fails. + */ + void send(String connectionFactory, String queue, String body, String replyTo, String username, String password) throws Exception; + + /** + * Consume messages from a given destination. + * + * @param connectionFactory The JMS connection factory name. + * @param queue The queue name. + * @param selector The messages selector. + * @param username The (optional) username to connect to the JMS broker. + * @param password The (optional) password to connect to the JMS broker. + * @return The number of messages consumed. + * @throws Exception If the service fails. + */ + int consume(String connectionFactory, String queue, String selector, String username, String password) throws Exception; + + /** + * Move messages from a destination to another. + * + * @param connectionFactory The JMS connection factory name. + * @param sourceQueue The source queue. + * @param targetQueue The target queue. + * @param selector The messages selector on the source queue. + * @param username The (optional) username to connect to the JMS broker. + * @param password The (optional) password to connect to the JMS broker. + * @return The number of messages moved. + * @throws Exception If the service fails. + */ + int move(String connectionFactory, String sourceQueue, String targetQueue, String selector, String username, String password) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/java/org/apache/karaf/jms/command/BrowseCommand.java ---------------------------------------------------------------------- diff --git a/jms/src/main/java/org/apache/karaf/jms/command/BrowseCommand.java b/jms/src/main/java/org/apache/karaf/jms/command/BrowseCommand.java new file mode 100644 index 0000000..cb86aa6 --- /dev/null +++ b/jms/src/main/java/org/apache/karaf/jms/command/BrowseCommand.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.command; + +import java.util.List; + +import org.apache.karaf.jms.JmsMessage; +import org.apache.karaf.shell.api.action.Argument; +import org.apache.karaf.shell.api.action.Command; +import org.apache.karaf.shell.api.action.Option; +import org.apache.karaf.shell.api.action.lifecycle.Service; +import org.apache.karaf.shell.support.table.ShellTable; + +@Command(scope = "jms", name = "browse", description = "Browse a JMS queue") +@Service +public class BrowseCommand extends JmsConnectionCommandSupport { + + @Argument(index = 1, name = "queue", description = "The JMS queue to browse", required = true, multiValued = false) + String queue; + + @Option(name = "-s", aliases = { "--selector" }, description = "The selector to select the messages to browse", required = false, multiValued = false) + String selector; + + @Option(name = "-v", aliases = { "--verbose" }, description = "Display JMS properties", required = false, multiValued = false) + boolean verbose = false; + + @Override + public Object execute() throws Exception { + + ShellTable table = new ShellTable(); + table.column("Message ID"); + table.column("Content").maxSize(80); + table.column("Charset"); + table.column("Type"); + table.column("Correlation ID"); + table.column("Delivery Mode"); + table.column("Destination"); + table.column("Expiration"); + table.column("Priority"); + table.column("Redelivered"); + table.column("ReplyTo"); + table.column("Timestamp"); + if (verbose) { + table.column("Properties"); + } + + List<JmsMessage> messages = getJmsService().browse(connectionFactory, queue, selector, username, password); + for (JmsMessage message : messages) { + if (verbose) { + StringBuilder properties = new StringBuilder(); + for (String property : message.getProperties().keySet()) { + properties.append(property).append("=").append(message.getProperties().get(property)).append("\n"); + } + table.addRow().addContent( + message.getMessageId(), + message.getContent(), + message.getCharset(), + message.getType(), + message.getCorrelationID(), + message.getDeliveryMode(), + message.getDestination(), + message.getExpiration(), + message.getPriority(), + message.isRedelivered(), + message.getReplyTo(), + message.getTimestamp(), + properties.toString()); + } else { + table.addRow().addContent( + message.getMessageId(), + message.getContent(), + message.getCharset(), + message.getType(), + message.getCorrelationID(), + message.getDeliveryMode(), + message.getDestination(), + message.getExpiration(), + message.getPriority(), + message.isRedelivered(), + message.getReplyTo(), + message.getTimestamp()); + } + } + + table.print(System.out); + + return null; + } + +} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/java/org/apache/karaf/jms/command/ConnectionFactoriesCommand.java ---------------------------------------------------------------------- diff --git a/jms/src/main/java/org/apache/karaf/jms/command/ConnectionFactoriesCommand.java b/jms/src/main/java/org/apache/karaf/jms/command/ConnectionFactoriesCommand.java new file mode 100644 index 0000000..b698336 --- /dev/null +++ b/jms/src/main/java/org/apache/karaf/jms/command/ConnectionFactoriesCommand.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.command; + +import java.util.List; + +import org.apache.karaf.shell.api.action.Command; +import org.apache.karaf.shell.api.action.lifecycle.Service; +import org.apache.karaf.shell.support.table.ShellTable; + +@Command(scope = "jms", name = "connectionfactories", description = "List the JMS connection factories") +@Service +public class ConnectionFactoriesCommand extends JmsCommandSupport { + + @Override + public Object execute() throws Exception { + + ShellTable table = new ShellTable(); + table.column("JMS Connection Factory"); + + List<String> connectionFactories = getJmsService().connectionFactories(); + for (String connectionFactory : connectionFactories) { + table.addRow().addContent(connectionFactory); + } + + table.print(System.out); + + return null; + } + +} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/java/org/apache/karaf/jms/command/ConsumeCommand.java ---------------------------------------------------------------------- diff --git a/jms/src/main/java/org/apache/karaf/jms/command/ConsumeCommand.java b/jms/src/main/java/org/apache/karaf/jms/command/ConsumeCommand.java new file mode 100644 index 0000000..cd8caaf --- /dev/null +++ b/jms/src/main/java/org/apache/karaf/jms/command/ConsumeCommand.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.command; + +import org.apache.karaf.shell.api.action.Argument; +import org.apache.karaf.shell.api.action.Command; +import org.apache.karaf.shell.api.action.Option; +import org.apache.karaf.shell.api.action.lifecycle.Service; + +@Command(scope = "jms", name = "consume", description = "Consume messages from a JMS queue.") +@Service +public class ConsumeCommand extends JmsConnectionCommandSupport { + + @Argument(index = 1, name = "queue", description = "The JMS queue where to consume messages", required = true, multiValued = false) + String queue; + + @Option(name = "-s", aliases = { "--selector" }, description = "The selector to use to select the messages to consume", required = false, multiValued = false) + String selector; + + @Override + public Object execute() throws Exception { + System.out.println(getJmsService().consume(connectionFactory, queue, selector, username, password) + " message(s) consumed"); + return null; + } + +} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/java/org/apache/karaf/jms/command/CountCommand.java ---------------------------------------------------------------------- diff --git a/jms/src/main/java/org/apache/karaf/jms/command/CountCommand.java b/jms/src/main/java/org/apache/karaf/jms/command/CountCommand.java new file mode 100644 index 0000000..576e8dd --- /dev/null +++ b/jms/src/main/java/org/apache/karaf/jms/command/CountCommand.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.command; + + +import org.apache.karaf.shell.api.action.Argument; +import org.apache.karaf.shell.api.action.Command; +import org.apache.karaf.shell.api.action.lifecycle.Service; +import org.apache.karaf.shell.support.table.ShellTable; + +@Command(scope = "jms", name = "count", description = "Count the number of messages on a JMS queue.") +@Service +public class CountCommand extends JmsConnectionCommandSupport { + + @Argument(index = 1, name = "queue", description = "The JMS queue name", required = true, multiValued = false) + String queue; + + @Override + public Object execute() throws Exception { + ShellTable table = new ShellTable(); + table.column("Messages Count"); + table.addRow().addContent(getJmsService().count(connectionFactory, queue, username, password)); + table.print(System.out); + return null; + } + +} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/java/org/apache/karaf/jms/command/CreateCommand.java ---------------------------------------------------------------------- diff --git a/jms/src/main/java/org/apache/karaf/jms/command/CreateCommand.java b/jms/src/main/java/org/apache/karaf/jms/command/CreateCommand.java new file mode 100644 index 0000000..64ccf02 --- /dev/null +++ b/jms/src/main/java/org/apache/karaf/jms/command/CreateCommand.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.command; + +import org.apache.karaf.shell.api.action.Argument; +import org.apache.karaf.shell.api.action.Command; +import org.apache.karaf.shell.api.action.Completion; +import org.apache.karaf.shell.api.action.Option; +import org.apache.karaf.shell.api.action.lifecycle.Service; +import org.apache.karaf.shell.support.completers.StringsCompleter; + +@Command(scope = "jms", name = "create", description = "Create a JMS connection factory.") +@Service +public class CreateCommand extends JmsCommandSupport { + + @Argument(index = 0, name = "name", description = "The JMS connection factory name", required = true, multiValued = false) + String name; + + @Option(name = "-t", aliases = { "--type" }, description = "The JMS connection factory type (ActiveMQ, Artemis or WebsphereMQ)", required = false, multiValued = false) + @Completion(value = StringsCompleter.class, values = { "activemq", "artemis", "webspheremq" }) + String type = "ActiveMQ"; + + @Option(name = "--url", description = "URL of the JMS broker. For WebsphereMQ type, the URL is hostname/port/queuemanager/channel", required = false, multiValued = false) + String url = "tcp://localhost:61616"; + + @Option(name = "-u", aliases = { "--username" }, description = "Username to connect to the JMS broker", required = false, multiValued = false) + String username = "karaf"; + + @Option(name = "-p", aliases = { "--password" }, description = "Password to connect to the JMS broker", required = false, multiValued = false) + String password = "karaf"; + + @Override + public Object execute() throws Exception { + getJmsService().create(name, type, url, username, password); + return null; + } + +} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/java/org/apache/karaf/jms/command/DeleteCommand.java ---------------------------------------------------------------------- diff --git a/jms/src/main/java/org/apache/karaf/jms/command/DeleteCommand.java b/jms/src/main/java/org/apache/karaf/jms/command/DeleteCommand.java new file mode 100644 index 0000000..cab3123 --- /dev/null +++ b/jms/src/main/java/org/apache/karaf/jms/command/DeleteCommand.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.command; + + +import org.apache.karaf.jms.command.completers.ConnectionFactoriesFileNameCompleter; +import org.apache.karaf.shell.api.action.Argument; +import org.apache.karaf.shell.api.action.Command; +import org.apache.karaf.shell.api.action.Completion; +import org.apache.karaf.shell.api.action.lifecycle.Service; + +@Command(scope = "jms", name = "delete", description = "Delete a JMS connection factory") +@Service +public class DeleteCommand extends JmsCommandSupport { + + @Argument(index = 0, name = "name", description = "The JMS connection factory name", required = true, multiValued = false) + @Completion(ConnectionFactoriesFileNameCompleter.class) + String name; + + @Override + public Object execute() throws Exception { + getJmsService().delete(name); + return null; + } + +} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/java/org/apache/karaf/jms/command/InfoCommand.java ---------------------------------------------------------------------- diff --git a/jms/src/main/java/org/apache/karaf/jms/command/InfoCommand.java b/jms/src/main/java/org/apache/karaf/jms/command/InfoCommand.java new file mode 100644 index 0000000..354db39 --- /dev/null +++ b/jms/src/main/java/org/apache/karaf/jms/command/InfoCommand.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.command; + + +import java.util.Map; + +import org.apache.karaf.shell.api.action.Command; +import org.apache.karaf.shell.api.action.lifecycle.Service; +import org.apache.karaf.shell.support.table.ShellTable; + +@Command(scope = "jms", name = "info", description = "Provides details about a JMS connection factory.") +@Service +public class InfoCommand extends JmsConnectionCommandSupport { + + @Override + public Object execute() throws Exception { + ShellTable table = new ShellTable(); + table.column("Property"); + table.column("Value"); + + Map<String, String> info = getJmsService().info(connectionFactory, username, password); + for (String key : info.keySet()) { + table.addRow().addContent(key, info.get(key)); + } + + table.print(System.out); + + return null; + } + +} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/java/org/apache/karaf/jms/command/JmsCommandSupport.java ---------------------------------------------------------------------- diff --git a/jms/src/main/java/org/apache/karaf/jms/command/JmsCommandSupport.java b/jms/src/main/java/org/apache/karaf/jms/command/JmsCommandSupport.java new file mode 100644 index 0000000..2f5df8f --- /dev/null +++ b/jms/src/main/java/org/apache/karaf/jms/command/JmsCommandSupport.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.command; + +import org.apache.karaf.jms.JmsService; +import org.apache.karaf.shell.api.action.Action; +import org.apache.karaf.shell.api.action.lifecycle.Reference; + +public abstract class JmsCommandSupport implements Action { + + @Reference + private JmsService jmsService; + + public JmsService getJmsService() { + return jmsService; + } + + public void setJmsService(JmsService jmsService) { + this.jmsService = jmsService; + } + +} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/java/org/apache/karaf/jms/command/JmsConnectionCommandSupport.java ---------------------------------------------------------------------- diff --git a/jms/src/main/java/org/apache/karaf/jms/command/JmsConnectionCommandSupport.java b/jms/src/main/java/org/apache/karaf/jms/command/JmsConnectionCommandSupport.java new file mode 100644 index 0000000..64adfe4 --- /dev/null +++ b/jms/src/main/java/org/apache/karaf/jms/command/JmsConnectionCommandSupport.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.command; + +import org.apache.karaf.jms.command.completers.ConnectionFactoriesNameCompleter; +import org.apache.karaf.shell.api.action.Argument; +import org.apache.karaf.shell.api.action.Completion; +import org.apache.karaf.shell.api.action.Option; + +/** + * For commands that need a connection factory and authentication information + */ +public abstract class JmsConnectionCommandSupport extends JmsCommandSupport { + + @Argument(index = 0, name = "connectionFactory", description = "The JMS connection factory name", required = true, multiValued = false) + @Completion(ConnectionFactoriesNameCompleter.class) + String connectionFactory; + + @Option(name = "-u", aliases = { "--username" }, description = "Username to connect to the JMS broker", required = false, multiValued = false) + String username = "karaf"; + + @Option(name = "-p", aliases = { "--password" }, description = "Password to connect to the JMS broker", required = false, multiValued = false) + String password = "karaf"; + +} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/java/org/apache/karaf/jms/command/MoveCommand.java ---------------------------------------------------------------------- diff --git a/jms/src/main/java/org/apache/karaf/jms/command/MoveCommand.java b/jms/src/main/java/org/apache/karaf/jms/command/MoveCommand.java new file mode 100644 index 0000000..a4c8d12 --- /dev/null +++ b/jms/src/main/java/org/apache/karaf/jms/command/MoveCommand.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.command; + + +import org.apache.karaf.shell.api.action.Argument; +import org.apache.karaf.shell.api.action.Command; +import org.apache.karaf.shell.api.action.Option; +import org.apache.karaf.shell.api.action.lifecycle.Service; + +@Command(scope = "jms", name = "move", description = "Move messages from one JMS queue to another one.") +@Service +public class MoveCommand extends JmsConnectionCommandSupport { + + @Argument(index = 1, name = "source", description = "The source JMS queue", required = true, multiValued = false) + String source; + + @Argument(index = 2, name = "destination", description = "The destination JMS queue", required = true, multiValued = false) + String destination; + + @Option(name = "-s", aliases = { "--selector" }, description = "Selector to move only some messages", required = false, multiValued = false) + String selector; + + @Override + public Object execute() throws Exception { + System.out.println(getJmsService().move(connectionFactory, source, destination, selector, username, password) + " message(s) moved"); + return null; + } + +} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/java/org/apache/karaf/jms/command/QueuesCommand.java ---------------------------------------------------------------------- diff --git a/jms/src/main/java/org/apache/karaf/jms/command/QueuesCommand.java b/jms/src/main/java/org/apache/karaf/jms/command/QueuesCommand.java new file mode 100644 index 0000000..7cf1dac --- /dev/null +++ b/jms/src/main/java/org/apache/karaf/jms/command/QueuesCommand.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.command; + + +import org.apache.karaf.shell.api.action.Command; +import org.apache.karaf.shell.api.action.lifecycle.Service; +import org.apache.karaf.shell.support.table.ShellTable; + +@Command(scope = "jms", name = "queues", description = "List the JMS queues.") +@Service +public class QueuesCommand extends JmsConnectionCommandSupport { + + @Override + public Object execute() throws Exception { + ShellTable table = new ShellTable(); + + table.column("JMS Queues"); + + for (String queue : getJmsService().queues(connectionFactory, username, password)) { + table.addRow().addContent(queue); + } + + table.print(System.out); + + return null; + } + +} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/java/org/apache/karaf/jms/command/SendCommand.java ---------------------------------------------------------------------- diff --git a/jms/src/main/java/org/apache/karaf/jms/command/SendCommand.java b/jms/src/main/java/org/apache/karaf/jms/command/SendCommand.java new file mode 100644 index 0000000..63d3f4a --- /dev/null +++ b/jms/src/main/java/org/apache/karaf/jms/command/SendCommand.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.command; + + +import org.apache.karaf.shell.api.action.Argument; +import org.apache.karaf.shell.api.action.Command; +import org.apache.karaf.shell.api.action.Option; +import org.apache.karaf.shell.api.action.lifecycle.Service; + +@Command(scope = "jms", name = "send", description = "Send a message to ") +@Service +public class SendCommand extends JmsConnectionCommandSupport { + + @Argument(index = 1, name = "queue", description = "The JMS queue name", required = true, multiValued = false) + String queue; + + @Argument(index = 2, name = "message", description = "The JMS message content", required = true, multiValued = false) + String message; + + @Option(name = "-r", aliases = { "--replyTo" }, description = "Set the message ReplyTo", required = false, multiValued = false) + String replyTo; + + @Override + public Object execute() throws Exception { + getJmsService().send(connectionFactory, queue, message, replyTo, username, password); + return null; + } + +} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/java/org/apache/karaf/jms/command/TopicsCommand.java ---------------------------------------------------------------------- diff --git a/jms/src/main/java/org/apache/karaf/jms/command/TopicsCommand.java b/jms/src/main/java/org/apache/karaf/jms/command/TopicsCommand.java new file mode 100644 index 0000000..b583bc4 --- /dev/null +++ b/jms/src/main/java/org/apache/karaf/jms/command/TopicsCommand.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.command; + + +import org.apache.karaf.shell.api.action.Command; +import org.apache.karaf.shell.api.action.lifecycle.Service; +import org.apache.karaf.shell.support.table.ShellTable; + +@Command(scope = "jms", name = "topics", description = "List the JMS topics.") +@Service +public class TopicsCommand extends JmsConnectionCommandSupport { + + @Override + public Object execute() throws Exception { + ShellTable table = new ShellTable(); + + table.column("JMS Topics"); + + for (String topic : getJmsService().topics(connectionFactory, username, password)) { + table.addRow().addContent(topic); + } + + table.print(System.out); + + return null; + } + +} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/java/org/apache/karaf/jms/command/completers/ConnectionFactoriesFileNameCompleter.java ---------------------------------------------------------------------- diff --git a/jms/src/main/java/org/apache/karaf/jms/command/completers/ConnectionFactoriesFileNameCompleter.java b/jms/src/main/java/org/apache/karaf/jms/command/completers/ConnectionFactoriesFileNameCompleter.java new file mode 100644 index 0000000..c33ff62 --- /dev/null +++ b/jms/src/main/java/org/apache/karaf/jms/command/completers/ConnectionFactoriesFileNameCompleter.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.command.completers; + +import java.util.List; + +import org.apache.karaf.jms.JmsService; +import org.apache.karaf.shell.api.action.lifecycle.Reference; +import org.apache.karaf.shell.api.action.lifecycle.Service; +import org.apache.karaf.shell.api.console.CommandLine; +import org.apache.karaf.shell.api.console.Completer; +import org.apache.karaf.shell.api.console.Session; +import org.apache.karaf.shell.support.completers.StringsCompleter; + +/** + * Completer on the JMS connection factory file names. + */ +@Service +public class ConnectionFactoriesFileNameCompleter implements Completer { + + @Reference + private JmsService jmsService; + + @Override + public int complete(Session session, CommandLine commandLine, List<String> candidates) { + StringsCompleter delegate = new StringsCompleter(); + try { + for (String connectionFactory : jmsService.connectionFactoryFileNames()) { + delegate.getStrings().add(connectionFactory.replace("connectionfactory-", "").replace(".xml", "")); + } + } catch (Exception e) { + // nothing to do + } + return delegate.complete(session, commandLine, candidates); + } + + public JmsService getJmsService() { + return jmsService; + } + + public void setJmsService(JmsService jmsService) { + this.jmsService = jmsService; + } + +} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/java/org/apache/karaf/jms/command/completers/ConnectionFactoriesNameCompleter.java ---------------------------------------------------------------------- diff --git a/jms/src/main/java/org/apache/karaf/jms/command/completers/ConnectionFactoriesNameCompleter.java b/jms/src/main/java/org/apache/karaf/jms/command/completers/ConnectionFactoriesNameCompleter.java new file mode 100644 index 0000000..2fd8d2a --- /dev/null +++ b/jms/src/main/java/org/apache/karaf/jms/command/completers/ConnectionFactoriesNameCompleter.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.command.completers; + +import java.util.List; + +import org.apache.karaf.jms.JmsService; +import org.apache.karaf.shell.api.action.lifecycle.Reference; +import org.apache.karaf.shell.api.action.lifecycle.Service; +import org.apache.karaf.shell.api.console.CommandLine; +import org.apache.karaf.shell.api.console.Completer; +import org.apache.karaf.shell.api.console.Session; +import org.apache.karaf.shell.support.completers.StringsCompleter; + +/** + * Completer on the JMS connection factories name. + */ +@Service +public class ConnectionFactoriesNameCompleter implements Completer { + + @Reference + private JmsService jmsService; + + @Override + public int complete(Session session, CommandLine commandLine, List<String> candidates) { + StringsCompleter delegate = new StringsCompleter(); + try { + for (String connectionFactory : jmsService.connectionFactories()) { + delegate.getStrings().add(connectionFactory + " "); + } + } catch (Exception e) { + // nothing to do + } + return delegate.complete(session, commandLine, candidates); + } + + public JmsService getJmsService() { + return jmsService; + } + + public void setJmsService(JmsService jmsService) { + this.jmsService = jmsService; + } + +} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/java/org/apache/karaf/jms/internal/ActiveMQDestinationSourceFactory.java ---------------------------------------------------------------------- diff --git a/jms/src/main/java/org/apache/karaf/jms/internal/ActiveMQDestinationSourceFactory.java b/jms/src/main/java/org/apache/karaf/jms/internal/ActiveMQDestinationSourceFactory.java new file mode 100644 index 0000000..5d87d01 --- /dev/null +++ b/jms/src/main/java/org/apache/karaf/jms/internal/ActiveMQDestinationSourceFactory.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.internal; + +import javax.jms.ConnectionMetaData; +import javax.jms.Destination; +import javax.jms.JMSConsumer; +import javax.jms.JMSContext; +import javax.jms.Message; +import javax.jms.Queue; +import javax.jms.Topic; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +class ActiveMQDestinationSourceFactory implements DestinationSource.Factory { + + @Override + public DestinationSource create(JMSContext context) { + try { + ConnectionMetaData cmd = context.getMetaData(); + if (cmd.getJMSProviderName().equals("ActiveMQ") && cmd.getProviderVersion().startsWith("5.")) { + return type -> getNames(context, type); + } + } catch (Throwable t) { + // Ignore + } + return null; + } + + private List<String> getNames(JMSContext context, DestinationSource.DestinationType type) { + try { + List<String> names = new ArrayList<>(); + context.start(); + String dest = "ActiveMQ.Advisory." + + (type == DestinationSource.DestinationType.Queue ? "Queue" : "Topic"); + try (JMSConsumer consumer = context.createConsumer(context.createTopic(dest))) { + while (true) { + Message message = consumer.receive(100); + if (message == null) { + return names; + } + Destination destination = (Destination) getField(message, "super.dataStructure", "destination"); + if (destination instanceof Queue) { + names.add(((Queue) destination).getQueueName()); + } else { + names.add(((Topic) destination).getTopicName()); + } + } + + } + } catch (Exception e) { + // Ignore + String msg = e.toString(); + } + return Collections.emptyList(); + } + + private static Object getField(Object context, String... fields) throws NoSuchFieldException, IllegalAccessException { + Object obj = context; + for (String field : fields) { + Class cl = obj.getClass(); + while (field.startsWith("super.")) { + cl = cl.getSuperclass(); + field = field.substring("super.".length()); + } + Field f = cl.getDeclaredField(field); + f.setAccessible(true); + obj = f.get(obj); + } + return obj; + } + +} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/java/org/apache/karaf/jms/internal/ArtemisDestinationSourceFactory.java ---------------------------------------------------------------------- diff --git a/jms/src/main/java/org/apache/karaf/jms/internal/ArtemisDestinationSourceFactory.java b/jms/src/main/java/org/apache/karaf/jms/internal/ArtemisDestinationSourceFactory.java new file mode 100644 index 0000000..b81bf64 --- /dev/null +++ b/jms/src/main/java/org/apache/karaf/jms/internal/ArtemisDestinationSourceFactory.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.internal; + +import org.apache.karaf.util.json.JsonReader; + +import javax.jms.ConnectionMetaData; +import javax.jms.JMSConsumer; +import javax.jms.JMSContext; +import javax.jms.Message; +import javax.jms.Queue; +import javax.jms.TextMessage; +import java.io.StringReader; +import java.util.Collections; +import java.util.List; + +class ArtemisDestinationSourceFactory implements DestinationSource.Factory { + + @Override + public DestinationSource create(JMSContext context) { + try { + ConnectionMetaData cmd = context.getMetaData(); + if (cmd.getJMSProviderName().equals("ActiveMQ") && cmd.getProviderVersion().startsWith("2.")) { + return type -> getNames(context, type); + } + } catch (Throwable t) { + // Ignore + } + return null; + } + + private List<String> getNames(JMSContext context, DestinationSource.DestinationType type) { + try { + Queue managementQueue = context.createQueue("activemq.management"); + Queue replyTo = context.createTemporaryQueue(); + + context.start(); + + String routing = type == DestinationSource.DestinationType.Queue ? "ANYCAST" : "MULTICAST"; + context.createProducer() + .setProperty("_AMQ_ResourceName", "broker") + .setProperty("_AMQ_OperationName", "getQueueNames") + .setJMSReplyTo(replyTo) + .send(managementQueue, "[\"" + routing + "\"]"); + try (JMSConsumer consumer = context.createConsumer(replyTo)) { + Message reply = consumer.receive(500); + String json = ((TextMessage) reply).getText(); + List<?> array = (List<?>) JsonReader.read(new StringReader(json)); + return (List<String>) array.get(0); + } + } catch (Exception e) { + return Collections.emptyList(); + } + } +} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/java/org/apache/karaf/jms/internal/DestinationSource.java ---------------------------------------------------------------------- diff --git a/jms/src/main/java/org/apache/karaf/jms/internal/DestinationSource.java b/jms/src/main/java/org/apache/karaf/jms/internal/DestinationSource.java new file mode 100644 index 0000000..0593bc4 --- /dev/null +++ b/jms/src/main/java/org/apache/karaf/jms/internal/DestinationSource.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.internal; + +import javax.jms.Connection; +import javax.jms.JMSContext; +import javax.jms.JMSException; +import java.util.List; + +interface DestinationSource { + + enum DestinationType { + Queue, Topic + } + + interface Factory { + + DestinationSource create(JMSContext context); + } + + List<String> getNames(DestinationType type); +} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/java/org/apache/karaf/jms/internal/JmsMBeanImpl.java ---------------------------------------------------------------------- diff --git a/jms/src/main/java/org/apache/karaf/jms/internal/JmsMBeanImpl.java b/jms/src/main/java/org/apache/karaf/jms/internal/JmsMBeanImpl.java new file mode 100644 index 0000000..e3b7801 --- /dev/null +++ b/jms/src/main/java/org/apache/karaf/jms/internal/JmsMBeanImpl.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.internal; + +import org.apache.karaf.jms.JmsMBean; +import org.apache.karaf.jms.JmsMessage; +import org.apache.karaf.jms.JmsService; + +import javax.management.MBeanException; +import javax.management.openmbean.*; +import java.util.List; +import java.util.Map; + +/** + * Default implementation of the JMS MBean. + */ +public class JmsMBeanImpl implements JmsMBean { + + private JmsService jmsService; + + @Override + public List<String> getConnectionfactories() throws MBeanException { + try { + return jmsService.connectionFactories(); + } catch (Throwable t) { + throw new MBeanException(null, t.getMessage()); + } + } + + @Override + public void create(String name, String type, String url) throws MBeanException { + try { + jmsService.create(name, type, url); + } catch (Throwable t) { + throw new MBeanException(null, t.getMessage()); + } + } + + @Override + public void create(String name, String type, String url, String username, String password) throws MBeanException { + try { + jmsService.create(name, type, url, username, password); + } catch (Throwable t) { + throw new MBeanException(null, t.getMessage()); + } + } + + @Override + public void delete(String name) throws MBeanException { + try { + jmsService.delete(name); + } catch (Throwable t) { + throw new MBeanException(null, t.getMessage()); + } + } + + @Override + public Map<String, String> info(String connectionFactory, String username, String password) throws MBeanException { + try { + return jmsService.info(connectionFactory, username, password); + } catch (Throwable t) { + throw new MBeanException(null, t.getMessage()); + } + } + + @Override + public int count(String connectionFactory, String queue, String username, String password) throws MBeanException { + try { + return jmsService.count(connectionFactory, queue, username, password); + } catch (Throwable t) { + throw new MBeanException(null, t.getMessage()); + } + } + + @Override + public List<String> queues(String connectionFactory, String username, String password) throws MBeanException { + try { + return jmsService.queues(connectionFactory, username, password); + } catch (Throwable t) { + throw new MBeanException(null, t.getMessage()); + } + } + + @Override + public List<String> topics(String connectionFactory, String username, String password) throws MBeanException { + try { + return jmsService.topics(connectionFactory, username, password); + } catch (Throwable t) { + throw new MBeanException(null, t.getMessage()); + } + } + + @Override + public void send(String connectionFactory, String queue, String content, String replyTo, String username, String password) throws MBeanException { + try { + jmsService.send(connectionFactory, queue, content, replyTo, username, password); + } catch (Throwable t) { + throw new MBeanException(null, t.getMessage()); + } + } + + @Override + public int consume(String connectionFactory, String queue, String selector, String username, String password) throws MBeanException { + try { + return jmsService.consume(connectionFactory, queue, selector, username, password); + } catch (Throwable t) { + throw new MBeanException(null, t.getMessage()); + } + } + + @Override + public int move(String connectionFactory, String source, String destination, String selector, String username, String password) throws MBeanException { + try { + return jmsService.move(connectionFactory, source, destination, selector, username, password); + } catch (Throwable t) { + throw new MBeanException(null, t.getMessage()); + } + } + + @Override + public TabularData browse(String connectionFactory, String queue, String selector, String username, String password) throws MBeanException { + try { + CompositeType type = new CompositeType("message", "JMS Message", + new String[]{ "id", "content", "charset", "type", "correlation", "delivery", "destination", "expiration", "priority", "redelivered", "replyto", "timestamp" }, + new String[]{ "Message ID", "Content", "Charset", "Type", "Correlation ID", "Delivery Mode", "Destination", "Expiration Date", "Priority", "Redelivered", "Reply-To", "Timestamp" }, + new OpenType[]{ SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.STRING }); + TabularType tableType = new TabularType("messages", "JMS Messages", type, new String[]{ "id" }); + TabularData table = new TabularDataSupport(tableType); + for (JmsMessage message : getJmsService().browse(connectionFactory, queue, selector, username, password)) { + CompositeData data = new CompositeDataSupport(type, + new String[]{ "id", "content", "charset", "type", "correlation", "delivery", "destination", "expiration", "priority", "redelivered", "replyto", "timestamp" }, + new Object[]{ message.getMessageId(), message.getContent(), message.getCharset(), message.getType(), message.getCorrelationID(), message.getDeliveryMode(), message.getDestination(), message.getExpiration(), message.getPriority(), message.isRedelivered(), message.getReplyTo(), message.getTimestamp() } + ); + table.put(data); + } + return table; + } catch (Throwable t) { + throw new MBeanException(null, t.getMessage()); + } + } + + public JmsService getJmsService() { + return jmsService; + } + + public void setJmsService(JmsService jmsService) { + this.jmsService = jmsService; + } + +} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/java/org/apache/karaf/jms/internal/JmsServiceImpl.java ---------------------------------------------------------------------- diff --git a/jms/src/main/java/org/apache/karaf/jms/internal/JmsServiceImpl.java b/jms/src/main/java/org/apache/karaf/jms/internal/JmsServiceImpl.java new file mode 100644 index 0000000..8dcbf94 --- /dev/null +++ b/jms/src/main/java/org/apache/karaf/jms/internal/JmsServiceImpl.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.internal; + +import org.apache.karaf.jms.JmsMessage; +import org.apache.karaf.jms.JmsService; +import org.ops4j.pax.jms.service.ConnectionFactoryFactory; +import org.osgi.framework.BundleContext; +import org.osgi.framework.Constants; +import org.osgi.framework.InvalidSyntaxException; +import org.osgi.framework.ServiceReference; +import org.osgi.service.cm.Configuration; +import org.osgi.service.cm.ConfigurationAdmin; + +import javax.jms.*; +import javax.jms.Queue; + +import java.io.*; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.*; +import java.util.stream.Collectors; + +/** + * Default implementation of the JMS Service. + */ +public class JmsServiceImpl implements JmsService { + + private BundleContext bundleContext; + private ConfigurationAdmin configAdmin; + private Path deployFolder; + + public JmsServiceImpl() { + deployFolder = Paths.get(System.getProperty("karaf.base"), "deploy"); + } + + @Override + public void create(String name, String type, String url) throws Exception { + create(name, type, url, null, null); + } + + @Override + public void create(String name, String type, String url, String username, String password) throws Exception { + if (type == null) { + throw new IllegalArgumentException("JMS connection factory type not known"); + } + + if (connectionFactories().contains(name)) { + throw new IllegalArgumentException("There is already a ConnectionFactory with the name " + name); + } + + Dictionary<String, String> properties = new Hashtable<>(); + properties.put("osgi.jndi.service.name", "jms/" + name); + properties.put(ConnectionFactoryFactory.JMS_CONNECTIONFACTORY_NAME, name); + properties.put(ConnectionFactoryFactory.JMS_CONNECTIONFACTORY_TYPE, type); + put(properties, ConnectionFactoryFactory.JMS_URL, url); + put(properties, ConnectionFactoryFactory.JMS_USER, username); + put(properties, ConnectionFactoryFactory.JMS_PASSWORD, password); + Configuration config = configAdmin.createFactoryConfiguration("org.ops4j.connectionfactory", null); + config.update(properties); + } + + private void put(Dictionary<String, String> properties, String key, String value) { + if (value != null) { + properties.put(key, value); + } + } + + @Override + public void delete(String name) throws Exception { + String filter = String.format("(&(service.factoryPid=org.ops4j.connectionfactory)(%s=%s))", ConnectionFactoryFactory.JMS_CONNECTIONFACTORY_NAME, name); + Configuration[] configs = configAdmin.listConfigurations(filter); + for (Configuration config : configs) { + config.delete(); + } + } + + @Override + public List<String> connectionFactories() throws Exception { + return bundleContext.getServiceReferences(ConnectionFactory.class, null).stream() + .map(this::getConnectionFactoryName) + .distinct() + .collect(Collectors.toList()); + } + + private String getConnectionFactoryName(ServiceReference<ConnectionFactory> reference) { + if (reference.getProperty("osgi.jndi.service.name") != null) { + return (String) reference.getProperty("osgi.jndi.service.name"); + } else if (reference.getProperty("name") != null) { + return (String) reference.getProperty("name"); + } else { + return reference.getProperty(Constants.SERVICE_ID).toString(); + } + } + + @Override + public List<String> connectionFactoryFileNames() throws Exception { + return Files.list(deployFolder) + .map(Path::getFileName) + .map(Path::toString) + .filter(name -> name.startsWith("connectionfactory-") && name.endsWith(".xml")) + .collect(Collectors.toList()); + } + + @Override + public Map<String, String> info(String connectionFactory, String username, String password) throws IOException, JMSException { + try (JMSContext context = createContext(connectionFactory, username, password)) { + ConnectionMetaData metaData = context.getMetaData(); + Map<String, String> map = new HashMap<>(); + map.put("product", metaData.getJMSProviderName()); + map.put("version", metaData.getProviderVersion()); + return map; + } + } + + @Override + public int count(String connectionFactory, final String destination, String username, String password) throws IOException, JMSException { + try (JMSContext context = createContext(connectionFactory, username, password)) { + try (QueueBrowser browser = context.createBrowser(context.createQueue(destination))) { + @SuppressWarnings("unchecked") + Enumeration<Message> enumeration = browser.getEnumeration(); + int count = 0; + while (enumeration.hasMoreElements()) { + enumeration.nextElement(); + count++; + } + return count; + } + } + } + + private JMSContext createContext(String name, String username, String password) { + return createContext(name, username, password, JMSContext.AUTO_ACKNOWLEDGE); + } + + private JMSContext createContext(String name, String username, String password, int sessionMode) { + ServiceReference<ConnectionFactory> sr = lookupConnectionFactory(name); + ConnectionFactory cf = bundleContext.getService(sr); + try { + return cf.createContext(username, password, sessionMode); + } finally { + bundleContext.ungetService(sr); + } + } + + private ServiceReference<ConnectionFactory> lookupConnectionFactory(String name) { + try { + Collection<ServiceReference<ConnectionFactory>> references = bundleContext.getServiceReferences( + ConnectionFactory.class, + "(|(osgi.jndi.service.name=" + name + ")(name=" + name + ")(service.id=" + name + "))"); + return references.stream() + .sorted(Comparator.<ServiceReference<?>>naturalOrder().reversed()) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("No JMS connection factory found for " + name)); + } catch (InvalidSyntaxException e) { + throw new RuntimeException("Error finding connection factory service " + name, e); + } + } + + private DestinationSource getDestinationSource(JMSContext context) throws JMSException { + List<DestinationSource.Factory> factories = Arrays.asList( + new ActiveMQDestinationSourceFactory(), + new ArtemisDestinationSourceFactory() + ); + DestinationSource source = null; + for (DestinationSource.Factory factory : factories) { + source = factory.create(context); + if (source != null) { + break; + } + } + if (source == null) { + source = d -> Collections.emptyList(); + } + return source; + } + + @Override + public List<String> queues(String connectionFactory, String username, String password) throws JMSException, IOException { + try (JMSContext context = createContext(connectionFactory, username, password)) { + return getDestinationSource(context).getNames(DestinationSource.DestinationType.Queue); + } + } + + @Override + public List<String> topics(String connectionFactory, String username, String password) throws IOException, JMSException { + try (JMSContext context = createContext(connectionFactory, username, password)) { + return getDestinationSource(context).getNames(DestinationSource.DestinationType.Topic); + } + } + + @Override + public List<JmsMessage> browse(String connectionFactory, final String queue, final String filter, + String username, String password) throws JMSException, IOException { + try (JMSContext context = createContext(connectionFactory, username, password)) { + try (QueueBrowser browser = context.createBrowser(context.createQueue(queue), filter)) { + List<JmsMessage> messages = new ArrayList<>(); + @SuppressWarnings("unchecked") + Enumeration<Message> enumeration = browser.getEnumeration(); + while (enumeration.hasMoreElements()) { + Message message = enumeration.nextElement(); + + messages.add(new JmsMessage(message)); + } + return messages; + } + } + } + + @Override + public void send(String connectionFactory, final String queue, final String body, final String replyTo, + String username, String password) throws IOException, JMSException { + try (JMSContext context = createContext(connectionFactory, username, password)) { + JMSProducer producer = context.createProducer(); + if (replyTo != null) { + producer.setJMSReplyTo(context.createQueue(replyTo)); + } + producer.send(context.createQueue(queue), body); + } + } + + @Override + public int consume(String connectionFactory, final String queue, final String selector, String username, + String password) throws Exception { + try (JMSContext context = createContext(connectionFactory, username, password)) { + try (JMSConsumer consumer = context.createConsumer(context.createQueue(queue), selector)) { + int count = 0; + Message message; + do { + message = consumer.receive(500L); + if (message != null) { + count++; + } + } while (message != null); + return count; + } + } + } + + @Override + public int move(String connectionFactory, final String sourceQueue, final String targetQueue, + final String selector, String username, String password) throws IOException, JMSException { + try (JMSContext context = createContext(connectionFactory, username, password, JMSContext.SESSION_TRANSACTED)) { + Queue source = context.createQueue(sourceQueue); + Queue target = context.createQueue(targetQueue); + try (JMSConsumer consumer = context.createConsumer(source, selector)) { + int count = 0; + while (true) { + Message message = consumer.receive(500L); + if (message != null) { + context.createProducer().send(target, message); + context.commit(); + count++; + } else { + break; + } + } + return count; + } + } + } + + public void setBundleContext(BundleContext bundleContext) { + this.bundleContext = bundleContext; + } + + public void setConfigAdmin(ConfigurationAdmin configAdmin) { + this.configAdmin = configAdmin; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/java/org/apache/karaf/jms/internal/osgi/Activator.java ---------------------------------------------------------------------- diff --git a/jms/src/main/java/org/apache/karaf/jms/internal/osgi/Activator.java b/jms/src/main/java/org/apache/karaf/jms/internal/osgi/Activator.java new file mode 100644 index 0000000..16ac799 --- /dev/null +++ b/jms/src/main/java/org/apache/karaf/jms/internal/osgi/Activator.java @@ -0,0 +1,38 @@ +package org.apache.karaf.jms.internal.osgi; + +import org.apache.karaf.jms.JmsService; +import org.apache.karaf.jms.internal.JmsMBeanImpl; +import org.apache.karaf.jms.internal.JmsServiceImpl; +import org.apache.karaf.shell.api.console.CommandLoggingFilter; +import org.apache.karaf.shell.support.RegexCommandLoggingFilter; +import org.apache.karaf.util.tracker.BaseActivator; +import org.apache.karaf.util.tracker.annotation.ProvideService; +import org.apache.karaf.util.tracker.annotation.RequireService; +import org.apache.karaf.util.tracker.annotation.Services; +import org.osgi.service.cm.ConfigurationAdmin; + +@Services( + provides = @ProvideService(JmsService.class), + requires = @RequireService(ConfigurationAdmin.class) +) +public class Activator extends BaseActivator { + @Override + protected void doStart() throws Exception { + ConfigurationAdmin configurationAdmin = getTrackedService(ConfigurationAdmin.class); + + JmsServiceImpl service = new JmsServiceImpl(); + service.setBundleContext(bundleContext); + service.setConfigAdmin(configurationAdmin); + register(JmsService.class, service); + + JmsMBeanImpl mbean = new JmsMBeanImpl(); + mbean.setJmsService(service); + registerMBean(mbean, "type=jms"); + + RegexCommandLoggingFilter filter = new RegexCommandLoggingFilter(); + filter.addRegEx("create +.*?--password ([^ ]+)", 2); + filter.addRegEx("create +.*?-p ([^ ]+)", 2); + register(CommandLoggingFilter.class, filter); + + } +} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/resources/OSGI-INF/bundle.info ---------------------------------------------------------------------- diff --git a/jms/src/main/resources/OSGI-INF/bundle.info b/jms/src/main/resources/OSGI-INF/bundle.info new file mode 100644 index 0000000..3624846 --- /dev/null +++ b/jms/src/main/resources/OSGI-INF/bundle.info @@ -0,0 +1,41 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +h1. Synopsis + + ${project.name} + + ${project.description} + + Maven URL: + [mvn:${project.groupId}/${project.artifactId}/${project.version}] + +h1. Description + + This bundle is the core implementation of the JMS service support. + + The JMS service allows you to create connection factories, and send/browse/consume messages. + +h1. Commands + + The bundle contains the following commands: +\${command-list|jms|indent=8,list,cyan} + +h1. See also + + JMS - section of the Karaf User Guide http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index f66ec96..53e1e81 100644 --- a/pom.xml +++ b/pom.xml @@ -242,7 +242,9 @@ <aries.transaction.blueprint.version>1.1.1</aries.transaction.blueprint.version> <aries.transaction.blueprint.version2>2.1.0</aries.transaction.blueprint.version2> <aries.util.version>1.1.3</aries.util.version> + <atomikos.version>4.0.4</atomikos.version> + <geronimo.transaction.manager.version>3.1.3</geronimo.transaction.manager.version> <guava.version>20.0</guava.version> <narayana.version>5.6.3.Final</narayana.version> <hibernate.annotations.common.version>3.3.0.ga</hibernate.annotations.common.version> @@ -273,7 +275,9 @@ <pax.url.version>2.5.2</pax.url.version> <pax.web.version>6.0.6</pax.web.version> <pax.tinybundle.version>2.1.1</pax.tinybundle.version> - <pax.jdbc.version>1.1.0</pax.jdbc.version> + <pax.jdbc.version>1.2.0-SNAPSHOT</pax.jdbc.version> + <pax.jms.version>0.0.1-SNAPSHOT</pax.jms.version> + <pax.transx.version>0.2.0-SNAPSHOT</pax.transx.version> <portlet-api.version>2.0</portlet-api.version> <slf4j.version>1.7.12</slf4j.version>
