removing vertx
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a98b280f Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a98b280f Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a98b280f Branch: refs/heads/artemis-1009 Commit: a98b280f1f63b53c2d76011a4068072f11db03c3 Parents: 6f496e7 Author: Clebert Suconic <[email protected]> Authored: Mon Feb 6 19:57:16 2017 -0500 Committer: Clebert Suconic <[email protected]> Committed: Wed Mar 1 10:54:45 2017 -0500 ---------------------------------------------------------------------- artemis-distribution/pom.xml | 5 - docs/user-manual/en/SUMMARY.md | 1 - docs/user-manual/en/vertx-integration.md | 88 --- examples/features/sub-modules/pom.xml | 1 - examples/features/sub-modules/vertx/pom.xml | 142 ---- examples/features/sub-modules/vertx/readme.html | 103 --- .../artemis/core/example/ExampleVerticle.java | 53 -- .../core/example/VertxConnectorExample.java | 103 --- .../main/resources/activemq/server0/broker.xml | 82 -- integration/activemq-vertx-integration/pom.xml | 144 ---- .../integration/vertx/ActiveMQVertxLogger.java | 55 -- .../vertx/IncomingVertxEventHandler.java | 265 ------- .../vertx/OutgoingVertxEventHandler.java | 290 ------- .../integration/vertx/VertxConstants.java | 82 -- .../VertxIncomingConnectorServiceFactory.java | 51 -- .../VertxOutgoingConnectorServiceFactory.java | 49 -- pom.xml | 6 - tests/integration-tests/pom.xml | 38 - .../integration/mqtt/imported/MQTTTest.java | 2 +- .../vertx/ActiveMQVertxUnitTest.java | 774 ------------------- 20 files changed, 1 insertion(+), 2333 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a98b280f/artemis-distribution/pom.xml ---------------------------------------------------------------------- diff --git a/artemis-distribution/pom.xml b/artemis-distribution/pom.xml index 1236f17..7d5cc49 100644 --- a/artemis-distribution/pom.xml +++ b/artemis-distribution/pom.xml @@ -78,11 +78,6 @@ <version>${project.version}</version> </dependency> <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>artemis-vertx-integration</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> <groupId>org.apache.activemq.rest</groupId> <artifactId>artemis-rest</artifactId> <version>${project.version}</version> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a98b280f/docs/user-manual/en/SUMMARY.md ---------------------------------------------------------------------- diff --git a/docs/user-manual/en/SUMMARY.md b/docs/user-manual/en/SUMMARY.md index 4030aa9..b612a5e 100644 --- a/docs/user-manual/en/SUMMARY.md +++ b/docs/user-manual/en/SUMMARY.md @@ -52,7 +52,6 @@ * [Apache Karaf](karaf.md) * [Spring Integration](spring-integration.md) * [AeroGear Integration](aerogear-integration.md) -* [VertX Integration](vertx-integration.md) * [CDI Integration](cdi-integration.md) * [Intercepting Operations](intercepting-operations.md) * [Protocols and Interoperability](protocols-interoperability.md) http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a98b280f/docs/user-manual/en/vertx-integration.md ---------------------------------------------------------------------- diff --git a/docs/user-manual/en/vertx-integration.md b/docs/user-manual/en/vertx-integration.md deleted file mode 100644 index f823f11..0000000 --- a/docs/user-manual/en/vertx-integration.md +++ /dev/null @@ -1,88 +0,0 @@ -# Vert.x Integration - -[Vert.x](http://vertx.io/) is a lightweight, high performance -application platform for the JVM that's designed for modern mobile, web, -and enterprise applications. Vert.x provides a distributed event bus -that allows messages to be sent across vert.x instances and clients. You -can now redirect and persist any vert.x messages to Apache ActiveMQ Artemis and route -those messages to a specified vertx address by configuring Apache ActiveMQ Artemis -vertx incoming and outgoing vertx connector services. - -## Configuring a Vertx Incoming Connector Service - -Vertx Incoming Connector services receive messages from vertx event bus -and route them to an Apache ActiveMQ Artemis queue. Such a service can be configured as -follows: - - <connector-service name="vertx-incoming-connector"> - <factory-class>org.apache.activemq.integration.vertx.VertxIncomingConnectorServiceFactory</factory-class> - <param key="host" value="127.0.0.1"/> - <param key="port" value="0"/> - <param key="queue" value="jms.queue.vertxQueue"/> - <param key="vertx-address" value="vertx.in.eventaddress"/> - </connector-service> - - -Shown are the required params for the connector service: - -- `queue`. The name of the Apache ActiveMQ Artemis queue to send message to. - -As well as these required parameters there are the following optional -parameters - -- `host`. The host name on which the vertx target container is - running. Default is localhost. - -- `port`. The port number to which the target vertx listens. Default - is zero. - -- `quorum-size`. The quorum size of the target vertx instance. - -- `ha-group`. The name of the ha-group of target vertx instance. - Default is `activemq`. - -- `vertx-address`. The vertx address to listen to. default is - `org.apache.activemq`. - -## Configuring a Vertx Outgoing Connector Service - -Vertx Outgoing Connector services fetch vertx messages from a ActiveMQ -queue and put them to vertx event bus. Such a service can be configured -as follows: - - <connector-service name="vertx-outgoing-connector"> - <factory-class>org.apache.activemq.integration.vertx.VertxOutgoingConnectorServiceFactory</factory-class> - <param key="host" value="127.0.0.1"/> - <param key="port" value="0"/> - <param key="queue" value="jms.queue.vertxQueue"/> - <param key="vertx-address" value="vertx.out.eventaddress"/> - <param key="publish" value="true"/> - </connector-service> - - -Shown are the required params for the connector service: - -- `queue`. The name of the Apache ActiveMQ Artemis queue to fetch message from. - -As well as these required parameters there are the following optional -parameters - -- `host`. The host name on which the vertx target container is - running. Default is localhost. - -- `port`. The port number to which the target vertx listens. Default - is zero. - -- `quorum-size`. The quorum size of the target vertx instance. - -- `ha-group`. The name of the ha-group of target vertx instance. - Default is `activemq`. - -- `vertx-address`. The vertx address to put messages to. default is - org.apache.activemq. - -- `publish`. How messages is sent to vertx event bus. "true" means - using publish style. "false" means using send style. Default is - false. - - http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a98b280f/examples/features/sub-modules/pom.xml ---------------------------------------------------------------------- diff --git a/examples/features/sub-modules/pom.xml b/examples/features/sub-modules/pom.xml index d59f915..ef47365 100644 --- a/examples/features/sub-modules/pom.xml +++ b/examples/features/sub-modules/pom.xml @@ -51,7 +51,6 @@ under the License. <modules> <module>aerogear</module> <module>artemis-ra-rar</module> - <module>vertx</module> </modules> </profile> </profiles> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a98b280f/examples/features/sub-modules/vertx/pom.xml ---------------------------------------------------------------------- diff --git a/examples/features/sub-modules/vertx/pom.xml b/examples/features/sub-modules/vertx/pom.xml deleted file mode 100644 index c09aada..0000000 --- a/examples/features/sub-modules/vertx/pom.xml +++ /dev/null @@ -1,142 +0,0 @@ -<?xml version='1.0'?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.activemq.examples.modules</groupId> - <artifactId>broker-modules</artifactId> - <version>2.0.0-SNAPSHOT</version> - </parent> - - <artifactId>artemis-vertx-example</artifactId> - <packaging>jar</packaging> - <name>ActiveMQ Artemis Vert.x Example</name> - - <properties> - <activemq.basedir>${project.basedir}/../../../..</activemq.basedir> - <vertx.version>2.1.2</vertx.version> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>artemis-server</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>artemis-core-client</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>artemis-commons</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-all</artifactId> - <version>${netty.version}</version> - </dependency> - <dependency> - <groupId>org.apache.geronimo.specs</groupId> - <artifactId>geronimo-jms_2.0_spec</artifactId> - </dependency> - <dependency> - <groupId>io.vertx</groupId> - <artifactId>vertx-core</artifactId> - <version>${vertx.version}</version> - </dependency> - <dependency> - <groupId>io.vertx</groupId> - <artifactId>vertx-platform</artifactId> - <version>${vertx.version}</version> - </dependency> - <dependency> - <groupId>io.vertx</groupId> - <artifactId>vertx-hazelcast</artifactId> - <version>${vertx.version}</version> - </dependency> - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>artemis-vertx-integration</artifactId> - <version>${project.version}</version> - </dependency> - - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.activemq</groupId> - <artifactId>artemis-maven-plugin</artifactId> - <executions> - <execution> - <id>create0</id> - <goals> - <goal>create</goal> - </goals> - <configuration> - <libListWithDeps> - <arg>org.apache.activemq.examples.modules:artemis-vertx-example:${project.version}</arg> - </libListWithDeps> - - <instance>${basedir}/target/server0</instance> - <configuration>${basedir}/target/classes/activemq/server0</configuration> - </configuration> - </execution> - <execution> - <id>start0</id> - <goals> - <goal>cli</goal> - </goals> - <configuration> - <ignore>${noServer}</ignore> - <spawn>true</spawn> - <location>${basedir}/target/server0</location> - <testURI>tcp://localhost:61616</testURI> - <args> - <param>run</param> - </args> - <name>server0</name> - </configuration> - </execution> - <execution> - <id>runClient</id> - <goals> - <goal>runClient</goal> - </goals> - <configuration> - <clientClass>org.apache.activemq.artemis.core.example.VertxConnectorExample</clientClass> - </configuration> - </execution> - </executions> - <dependencies> - <dependency> - <groupId>org.apache.activemq.examples.modules</groupId> - <artifactId>artemis-vertx-example</artifactId> - <version>${project.version}</version> - </dependency> - </dependencies> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a98b280f/examples/features/sub-modules/vertx/readme.html ---------------------------------------------------------------------- diff --git a/examples/features/sub-modules/vertx/readme.html b/examples/features/sub-modules/vertx/readme.html deleted file mode 100644 index e8f053a..0000000 --- a/examples/features/sub-modules/vertx/readme.html +++ /dev/null @@ -1,103 +0,0 @@ -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -<html> - <head> - <title>ActiveMQ Artemis Vert.x Connector Service Example</title> - <link rel="stylesheet" type="text/css" href="../../../common/common.css" /> - <link rel="stylesheet" type="text/css" href="../../../common/prettify.css" /> - <script type="text/javascript" src="../../../common/prettify.js"></script> - </head> - <body onload="prettyPrint()"> - <h1>Vert.x Connector Service Example</h1> - - <p>This example shows you how to configure ActiveMQ Artemis to use the Vert.x Connector Service.</p> - - <p>ActiveMQ Artemis supports 2 types of Vert.x connector, incoming and outgoing. - Incoming connector consumes from Vert.x event bus and forwards to a configurable address. - Outgoing connector consumes from a configurable address and forwards to a configurable Vert.x event bus. - </p> - - <p>In this example, an incoming connector and an outgoing connector are configured. A simple java Verticle - is deployed. The verticle registers a message handler on the outgoing connector's address ("outgoing.vertx.address"). - A String message is sent to Vert.x event bus on the incoming connector's address("incoming.vertx.address"). - The message then will be forwarded to a ActiveMQ Artemis queue by the incoming connector. The outgoing connector listens to - the ActiveMQ Artemis queue and forwards the message from ActiveMQ Artemis to Vert.x event bus on the outgoing connector's address. - The verticle finally receives the message from it's event bus.</p> - - <p>For more information on Vert.x concept please visit the <a href="http://vertx.io/">Vertx site</a></p> - - <h2>Example step-by-step</h2> - <p><i>To run the server, simply type <code>mvn verify</code> - from this directory.</p> - - <ol> - <li>First we need to create a Vert.x PlatformManager</li> - <pre class="prettyprint"> - <code>platformManager = PlatformLocator.factory.createPlatformManager(PORT, HOST);</code> - </pre> - - <li>We deploy a Verticle using the platformManager</li> - <pre class="prettyprint"> - <code>String verticle = "org.apache.activemq.artemis.core.example.ExampleVerticle"; - platformManager.deployVerticle(verticle, null, new URL[0], 1, null, - new Handler<AsyncResult<String>>(){ - - @Override - public void handle(AsyncResult<String> result) - { - if (!result.succeeded()) - { - throw new RuntimeException("failed to deploy verticle", result.cause()); - } - latch0.countDown(); - } - - });</code> - </pre> - - <li>We register a message handler with the event bus in the Verticle to listen on the outgoing connector's address.</li> - <pre class="prettyprint"> - <code>EventBus eventBus = vertx.eventBus(); - eventBus.registerHandler(VertxConnectorExample.OUTGOING, - new Handler<Message<?>>() { - @Override - public void handle(Message<?> startMsg) - { - Object body = startMsg.body(); - System.out.println("Verticle receives a message: " + body); - VertxConnectorExample.result.set(VertxConnectorExample.MSG.equals(body)); - latch0.countDown(); - } - }); - </code> - </pre> - - <li>We send a message to incoming connector's address via event bus</li> - <pre class="prettyprint"> - <code> - EventBus bus = platformManager.vertx().eventBus(); - bus.send(INCOMING, MSG); - </code> - </pre> - - <li>The message will eventually arrives at the Verticle's message handler.</li> - </ol> - </body> -</html> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a98b280f/examples/features/sub-modules/vertx/src/main/java/org/apache/activemq/artemis/core/example/ExampleVerticle.java ---------------------------------------------------------------------- diff --git a/examples/features/sub-modules/vertx/src/main/java/org/apache/activemq/artemis/core/example/ExampleVerticle.java b/examples/features/sub-modules/vertx/src/main/java/org/apache/activemq/artemis/core/example/ExampleVerticle.java deleted file mode 100644 index 3f248af..0000000 --- a/examples/features/sub-modules/vertx/src/main/java/org/apache/activemq/artemis/core/example/ExampleVerticle.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.example; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.vertx.java.core.Handler; -import org.vertx.java.core.eventbus.EventBus; -import org.vertx.java.core.eventbus.Message; -import org.vertx.java.platform.Verticle; - -public class ExampleVerticle extends Verticle { - - @Override - public void start() { - EventBus eventBus = vertx.eventBus(); - - final CountDownLatch latch0 = new CountDownLatch(1); - - // Register a handler on the outgoing connector's address - eventBus.registerHandler(VertxConnectorExample.OUTGOING, new Handler<Message<?>>() { - @Override - public void handle(Message<?> startMsg) { - Object body = startMsg.body(); - System.out.println("Verticle receives a message: " + body); - VertxConnectorExample.result.set(VertxConnectorExample.MSG.equals(body)); - latch0.countDown(); - //Tell the example to finish. - VertxConnectorExample.latch.countDown(); - } - }); - - try { - latch0.await(5000, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a98b280f/examples/features/sub-modules/vertx/src/main/java/org/apache/activemq/artemis/core/example/VertxConnectorExample.java ---------------------------------------------------------------------- diff --git a/examples/features/sub-modules/vertx/src/main/java/org/apache/activemq/artemis/core/example/VertxConnectorExample.java b/examples/features/sub-modules/vertx/src/main/java/org/apache/activemq/artemis/core/example/VertxConnectorExample.java deleted file mode 100644 index b8e6d98..0000000 --- a/examples/features/sub-modules/vertx/src/main/java/org/apache/activemq/artemis/core/example/VertxConnectorExample.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.example; - -import java.net.URL; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.vertx.java.core.AsyncResult; -import org.vertx.java.core.Handler; -import org.vertx.java.core.eventbus.EventBus; -import org.vertx.java.platform.PlatformLocator; -import org.vertx.java.platform.PlatformManager; -import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory; - -/** - * A simple example of using Vert.x connector service. - */ -public class VertxConnectorExample { - - public static final String INCOMING = "incoming.vertx.address"; - public static final String OUTGOING = "outgoing.vertx.address"; - public static final String MSG = "Welcome to Vertx world!"; - - public static final CountDownLatch latch = new CountDownLatch(1); - public static final AtomicBoolean result = new AtomicBoolean(false); - - private static final String HOST = "127.0.0.1"; - private static final int PORT = 0; - - public static void main(final String[] args) throws Exception { - System.setProperty("vertx.clusterManagerFactory", HazelcastClusterManagerFactory.class.getName()); - PlatformManager platformManager = null; - - try { - // Step 1 Create a Vert.x PlatformManager - platformManager = PlatformLocator.factory.createPlatformManager(PORT, HOST); - - final CountDownLatch latch0 = new CountDownLatch(1); - - // Step 2 Deploy a Verticle to receive message - String verticle = "org.apache.activemq.artemis.core.example.ExampleVerticle"; - platformManager.deployVerticle(verticle, null, new URL[0], 1, null, new Handler<AsyncResult<String>>() { - - @Override - public void handle(AsyncResult<String> result) { - if (!result.succeeded()) { - throw new RuntimeException("failed to deploy verticle", result.cause()); - } - latch0.countDown(); - } - - }); - - latch0.await(); - - // Step 3 Send a message to the incoming connector's address - EventBus bus = platformManager.vertx().eventBus(); - bus.send(INCOMING, MSG); - - // Step 4 Waiting for the Verticle to process the message - latch.await(10000, TimeUnit.MILLISECONDS); - } finally { - if (platformManager != null) { - platformManager.undeployAll(null); - platformManager.stop(); - } - reportResultAndExit(); - } - } - - private static void reportResultAndExit() { - if (!result.get()) { - System.err.println(); - System.err.println("#####################"); - System.err.println("### FAILURE! ###"); - System.err.println("#####################"); - System.exit(1); - } else { - System.out.println(); - System.out.println("#####################"); - System.out.println("### SUCCESS! ###"); - System.out.println("#####################"); - System.exit(0); - } - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a98b280f/examples/features/sub-modules/vertx/src/main/resources/activemq/server0/broker.xml ---------------------------------------------------------------------- diff --git a/examples/features/sub-modules/vertx/src/main/resources/activemq/server0/broker.xml b/examples/features/sub-modules/vertx/src/main/resources/activemq/server0/broker.xml deleted file mode 100644 index c550203..0000000 --- a/examples/features/sub-modules/vertx/src/main/resources/activemq/server0/broker.xml +++ /dev/null @@ -1,82 +0,0 @@ -<?xml version="1.0" encoding="UTF-8" standalone="no"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---><configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd"> - - <core xmlns="urn:activemq:core"> - <bindings-directory>target/server0/data/messaging/bindings</bindings-directory> - - <journal-directory>target/server0/data/messaging/journal</journal-directory> - - <large-messages-directory>target/server0/data/messaging/largemessages</large-messages-directory> - - <paging-directory>target/server0/data/messaging/paging</paging-directory> - <!-- Connectors --> - - <connectors> - <connector name="netty-connector">tcp://localhost:61616</connector> - </connectors> - - <!-- Acceptors --> - <acceptors> - <acceptor name="netty-acceptor">tcp://localhost:61616</acceptor> - </acceptors> - - <!-- Other config --> - - <security-settings> - <!--security for example queue--> - <security-setting match="queue.vertxQueue"> - <permission roles="guest" type="consume"/> - <permission roles="guest" type="send"/> - </security-setting> - </security-settings> - - - - <connector-services> - <connector-service name="my-incoming-vertx"> - <factory-class>org.apache.activemq.artemis.integration.vertx.VertxIncomingConnectorServiceFactory</factory-class> - <param key="queue" value="queue.vertxQueue"/> - <param key="host" value="localhost"/> - <param key="port" value="0"/> - <param key="vertx-address" value="incoming.vertx.address"/> - </connector-service> - <connector-service name="my-outgoing-vertx"> - <factory-class>org.apache.activemq.artemis.integration.vertx.VertxOutgoingConnectorServiceFactory</factory-class> - <param key="queue" value="queue.vertxQueue"/> - <param key="host" value="localhost"/> - <param key="port" value="0"/> - <param key="vertx-address" value="outgoing.vertx.address"/> - </connector-service> - </connector-services> - <addresses> - <address name="queue.vertxQueue"> - <multicast> - <queue name="queue.vertxQueue"/> - </multicast> - </address> - <address name="exampleQueue"> - <anycast> - <queue name="jms.queue.exampleQueue"/> - </anycast> - </address> - </addresses> - </core> - -</configuration> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a98b280f/integration/activemq-vertx-integration/pom.xml ---------------------------------------------------------------------- diff --git a/integration/activemq-vertx-integration/pom.xml b/integration/activemq-vertx-integration/pom.xml deleted file mode 100644 index ef8d31c..0000000 --- a/integration/activemq-vertx-integration/pom.xml +++ /dev/null @@ -1,144 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.activemq</groupId> - <artifactId>artemis-pom</artifactId> - <version>2.0.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <artifactId>artemis-vertx-integration</artifactId> - <packaging>jar</packaging> - <name>ActiveMQ Artemis Vert.x Integration</name> - - <properties> - - <activemq.basedir>${project.basedir}/../..</activemq.basedir> - - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - - <!-- Set pullInDeps to true if you want any modules specified in the 'includes' and 'deploys' fields - in your mod.json to be automatically pulled in during packaging and added inside your module. Doing this means your - module won't download and install those dependencies at run-time when they're first requested. --> - <vertx.pullInDeps>false</vertx.pullInDeps> - - <!-- Set createFatJar to true if you want to create a fat executable jar which contains the Vert.x binaries - along with the module so it can be run with java -jar <jarname> --> - <vertx.createFatJar>false</vertx.createFatJar> - - <!--Vertx module name--> - <module.name>${project.groupId}~${project.artifactId}~${project.version}</module.name> - - <!-- The directory where the module will be assembled - you can override this on the command line - with -Dmods.directory=mydir --> - <mods.directory>target/mods</mods.directory> - - <!--Dependency versions--> - <vertx.version>2.1.2</vertx.version> - <vertx.testtools.version>2.0.3-final</vertx.testtools.version> - <junit.version>4.11</junit.version> - - <!--Plugin versions--> - <maven.compiler.plugin.version>3.0</maven.compiler.plugin.version> - <maven.resources.plugin.version>2.6</maven.resources.plugin.version> - <maven.clean.plugin.version>2.5</maven.clean.plugin.version> - <maven.vertx.plugin.version>2.0.8-final</maven.vertx.plugin.version> - <maven.surefire.plugin.version>2.14</maven.surefire.plugin.version> - <maven.failsafe.plugin.version>2.14</maven.failsafe.plugin.version> - <maven.surefire.report.plugin.version>2.14</maven.surefire.report.plugin.version> - <maven.javadoc.plugin.version>2.9</maven.javadoc.plugin.version> - <maven.dependency.plugin.version>2.7</maven.dependency.plugin.version> - </properties> - - <repositories> - <repository> - <id>sonatype-nexus-snapshots</id> - <url>https://oss.sonatype.org/content/repositories/public</url> - </repository> - </repositories> - - <dependencies> - <dependency> - <groupId>org.jboss.logging</groupId> - <artifactId>jboss-logging-processor</artifactId> - <scope>provided</scope> - <optional>true</optional> - </dependency> - - <!-- - JBoss Logging - --> - <dependency> - <groupId>org.jboss.logging</groupId> - <artifactId>jboss-logging</artifactId> - </dependency> - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>artemis-server</artifactId> - <version>${project.version}</version> - </dependency> - - <!--Vertx provided dependencies--> - <dependency> - <groupId>io.vertx</groupId> - <artifactId>vertx-core</artifactId> - <version>${vertx.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>io.vertx</groupId> - <artifactId>vertx-platform</artifactId> - <version>${vertx.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>io.vertx</groupId> - <artifactId>vertx-hazelcast</artifactId> - <version>${vertx.version}</version> - <scope>provided</scope> - </dependency> - <!--Test dependencies--> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.11</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>io.vertx</groupId> - <artifactId>testtools</artifactId> - <version>${vertx.testtools.version}</version> - <scope>test</scope> - </dependency> - - <!-- Add any other dependencies that you want packaged into your module (in the lib dir) here - as 'compile' dependencies. Here is an example - <dependency> - <groupId>org.hamcrest</groupId> - <artifactId>hamcrest-core</artifactId> - <version>1.3</version> - <scope>compile</scope> - </dependency> - --> - - </dependencies> - -</project> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a98b280f/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/ActiveMQVertxLogger.java ---------------------------------------------------------------------- diff --git a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/ActiveMQVertxLogger.java b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/ActiveMQVertxLogger.java deleted file mode 100644 index 1f30c4c..0000000 --- a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/ActiveMQVertxLogger.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.integration.vertx; - -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.jboss.logging.BasicLogger; -import org.jboss.logging.Logger; -import org.jboss.logging.annotations.LogMessage; -import org.jboss.logging.annotations.Message; -import org.jboss.logging.annotations.MessageLogger; - -/** - * Logger Code 19 - * - * each message id must be 6 digits long starting with 19, the 3rd digit donates the level so - * - * INF0 1 - * WARN 2 - * DEBUG 3 - * ERROR 4 - * TRACE 5 - * FATAL 6 - * - * so an INFO message would be 191000 to 191999 - */ -@MessageLogger(projectCode = "AMQ") -interface ActiveMQVertxLogger extends BasicLogger { - - /** - * The vertx logger. - */ - ActiveMQVertxLogger LOGGER = Logger.getMessageLogger(ActiveMQVertxLogger.class, ActiveMQVertxLogger.class.getPackage().getName()); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 192001, value = "Non vertx message: {0}", format = Message.Format.MESSAGE_FORMAT) - void nonVertxMessage(ServerMessage message); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 192002, value = "Invalid vertx type: {0}", format = Message.Format.MESSAGE_FORMAT) - void invalidVertxType(Integer type); -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a98b280f/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/IncomingVertxEventHandler.java ---------------------------------------------------------------------- diff --git a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/IncomingVertxEventHandler.java b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/IncomingVertxEventHandler.java deleted file mode 100644 index 4d89e6d..0000000 --- a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/IncomingVertxEventHandler.java +++ /dev/null @@ -1,265 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.integration.vertx; - -import java.util.Map; - -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.persistence.StorageManager; -import org.apache.activemq.artemis.core.postoffice.Binding; -import org.apache.activemq.artemis.core.postoffice.PostOffice; -import org.apache.activemq.artemis.core.server.ConnectorService; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; -import org.apache.activemq.artemis.utils.ConfigurationHelper; -import org.vertx.java.core.Handler; -import org.vertx.java.core.buffer.Buffer; -import org.vertx.java.core.eventbus.EventBus; -import org.vertx.java.core.eventbus.Message; -import org.vertx.java.core.eventbus.ReplyException; -import org.vertx.java.core.eventbus.impl.PingMessage; -import org.vertx.java.core.json.JsonArray; -import org.vertx.java.core.json.JsonObject; -import org.vertx.java.platform.PlatformLocator; -import org.vertx.java.platform.PlatformManager; -import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory; - -class IncomingVertxEventHandler implements ConnectorService { - - private final String connectorName; - - private final String queueName; - - private final int port; - - private final String host; - - private final int quorumSize; - - private final String haGroup; - - private final String vertxAddress; - - private EventBus eventBus; - - private PlatformManager platformManager; - - private EventHandler handler; - - private final StorageManager storageManager; - - private final PostOffice postOffice; - - private boolean isStarted = false; - - IncomingVertxEventHandler(String connectorName, - Map<String, Object> configuration, - StorageManager storageManager, - PostOffice postOffice) { - this.connectorName = connectorName; - this.queueName = ConfigurationHelper.getStringProperty(VertxConstants.QUEUE_NAME, null, configuration); - - this.port = ConfigurationHelper.getIntProperty(VertxConstants.PORT, 0, configuration); - this.host = ConfigurationHelper.getStringProperty(VertxConstants.HOST, "localhost", configuration); - this.quorumSize = ConfigurationHelper.getIntProperty(VertxConstants.VERTX_QUORUM_SIZE, -1, configuration); - this.haGroup = ConfigurationHelper.getStringProperty(VertxConstants.VERTX_HA_GROUP, "activemq", configuration); - this.vertxAddress = ConfigurationHelper.getStringProperty(VertxConstants.VERTX_ADDRESS, "org.apache.activemq", configuration); - - this.storageManager = storageManager; - this.postOffice = postOffice; - } - - @Override - public void start() throws Exception { - if (this.isStarted) { - return; - } - System.setProperty("vertx.clusterManagerFactory", HazelcastClusterManagerFactory.class.getName()); - if (quorumSize != -1) { - platformManager = PlatformLocator.factory.createPlatformManager(port, host, quorumSize, haGroup); - } else { - platformManager = PlatformLocator.factory.createPlatformManager(port, host); - } - - eventBus = platformManager.vertx().eventBus(); - - Binding b = postOffice.getBinding(new SimpleString(queueName)); - if (b == null) { - throw new Exception(connectorName + ": queue " + queueName + " not found"); - } - - handler = new EventHandler(); - eventBus.registerHandler(vertxAddress, handler); - - isStarted = true; - ActiveMQVertxLogger.LOGGER.debug(connectorName + ": started"); - } - - @Override - public void stop() throws Exception { - if (!isStarted) { - return; - } - eventBus.unregisterHandler(vertxAddress, handler); - platformManager.stop(); - System.clearProperty("vertx.clusterManagerFactory"); - isStarted = false; - ActiveMQVertxLogger.LOGGER.debug(connectorName + ": stopped"); - } - - @Override - public boolean isStarted() { - return isStarted; - } - - @Override - public String getName() { - return connectorName; - } - - private class EventHandler implements Handler<Message<?>> { - - @Override - public void handle(Message<?> message) { - ServerMessage msg = new ServerMessageImpl(storageManager.generateID(), VertxConstants.INITIAL_MESSAGE_BUFFER_SIZE); - msg.setAddress(new SimpleString(queueName)); - msg.setDurable(true); - msg.encodeMessageIDToBuffer(); - - String replyAddress = message.replyAddress(); - if (replyAddress != null) { - msg.putStringProperty(VertxConstants.VERTX_MESSAGE_REPLYADDRESS, replyAddress); - } - - // it'd be better that Message expose its type information - int type = getMessageType(message); - - msg.putIntProperty(VertxConstants.VERTX_MESSAGE_TYPE, type); - - manualEncodeVertxMessageBody(msg.getBodyBuffer(), message.body(), type); - - try { - postOffice.route(msg, false); - } catch (Exception e) { - ActiveMQVertxLogger.LOGGER.error("failed to route msg " + msg, e); - } - } - - private void manualEncodeVertxMessageBody(ActiveMQBuffer bodyBuffer, Object body, int type) { - switch (type) { - case VertxConstants.TYPE_BOOLEAN: - bodyBuffer.writeBoolean(((Boolean) body)); - break; - case VertxConstants.TYPE_BUFFER: - Buffer buff = (Buffer) body; - int len = buff.length(); - bodyBuffer.writeInt(len); - bodyBuffer.writeBytes(((Buffer) body).getBytes()); - break; - case VertxConstants.TYPE_BYTEARRAY: - byte[] bytes = (byte[]) body; - bodyBuffer.writeInt(bytes.length); - bodyBuffer.writeBytes(bytes); - break; - case VertxConstants.TYPE_BYTE: - bodyBuffer.writeByte((byte) body); - break; - case VertxConstants.TYPE_CHARACTER: - bodyBuffer.writeChar((Character) body); - break; - case VertxConstants.TYPE_DOUBLE: - bodyBuffer.writeDouble((double) body); - break; - case VertxConstants.TYPE_FLOAT: - bodyBuffer.writeFloat((Float) body); - break; - case VertxConstants.TYPE_INT: - bodyBuffer.writeInt((Integer) body); - break; - case VertxConstants.TYPE_LONG: - bodyBuffer.writeLong((Long) body); - break; - case VertxConstants.TYPE_SHORT: - bodyBuffer.writeShort((Short) body); - break; - case VertxConstants.TYPE_STRING: - case VertxConstants.TYPE_PING: - bodyBuffer.writeString((String) body); - break; - case VertxConstants.TYPE_JSON_OBJECT: - bodyBuffer.writeString(((JsonObject) body).encode()); - break; - case VertxConstants.TYPE_JSON_ARRAY: - bodyBuffer.writeString(((JsonArray) body).encode()); - break; - case VertxConstants.TYPE_REPLY_FAILURE: - ReplyException except = (ReplyException) body; - bodyBuffer.writeInt(except.failureType().toInt()); - bodyBuffer.writeInt(except.failureCode()); - bodyBuffer.writeString(except.getMessage()); - break; - default: - throw new IllegalArgumentException("Invalid body type: " + type); - } - } - - private int getMessageType(Message<?> message) { - - Object body = message.body(); - - if (message instanceof PingMessage) { - return VertxConstants.TYPE_PING; - } else if (body instanceof Buffer) { - return VertxConstants.TYPE_BUFFER; - } else if (body instanceof Boolean) { - return VertxConstants.TYPE_BOOLEAN; - } else if (body instanceof byte[]) { - return VertxConstants.TYPE_BYTEARRAY; - } else if (body instanceof Byte) { - return VertxConstants.TYPE_BYTE; - } else if (body instanceof Character) { - return VertxConstants.TYPE_CHARACTER; - } else if (body instanceof Double) { - return VertxConstants.TYPE_DOUBLE; - } else if (body instanceof Float) { - return VertxConstants.TYPE_FLOAT; - } else if (body instanceof Integer) { - return VertxConstants.TYPE_INT; - } else if (body instanceof Long) { - return VertxConstants.TYPE_LONG; - } else if (body instanceof Short) { - return VertxConstants.TYPE_SHORT; - } else if (body instanceof String) { - return VertxConstants.TYPE_STRING; - } else if (body instanceof JsonArray) { - return VertxConstants.TYPE_JSON_ARRAY; - } else if (body instanceof JsonObject) { - return VertxConstants.TYPE_JSON_OBJECT; - } else if (body instanceof ReplyException) { - return VertxConstants.TYPE_REPLY_FAILURE; - } - throw new IllegalArgumentException("Type not supported: " + message); - } - - } - - @Override - public String toString() { - return "[IncomingVertxEventHandler(" + connectorName + "), queueName: " + queueName + " host: " + host + " port: " + port + " vertxAddress: " + vertxAddress + "]"; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a98b280f/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/OutgoingVertxEventHandler.java ---------------------------------------------------------------------- diff --git a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/OutgoingVertxEventHandler.java b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/OutgoingVertxEventHandler.java deleted file mode 100644 index 8820c39..0000000 --- a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/OutgoingVertxEventHandler.java +++ /dev/null @@ -1,290 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.integration.vertx; - -import java.util.List; -import java.util.Map; - -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.postoffice.Binding; -import org.apache.activemq.artemis.core.postoffice.PostOffice; -import org.apache.activemq.artemis.core.server.ConnectorService; -import org.apache.activemq.artemis.core.server.Consumer; -import org.apache.activemq.artemis.core.server.HandleStatus; -import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.utils.ConfigurationHelper; -import org.vertx.java.core.buffer.Buffer; -import org.vertx.java.core.eventbus.EventBus; -import org.vertx.java.core.eventbus.ReplyException; -import org.vertx.java.core.eventbus.ReplyFailure; -import org.vertx.java.core.json.JsonArray; -import org.vertx.java.core.json.JsonObject; -import org.vertx.java.platform.PlatformLocator; -import org.vertx.java.platform.PlatformManager; -import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory; - -class OutgoingVertxEventHandler implements Consumer, ConnectorService { - - private final String connectorName; - - private final String queueName; - - private final int port; - - private final String host; - - private final int quorumSize; - - private final String haGroup; - - private final String vertxAddress; - - private final boolean publish; - - private final PostOffice postOffice; - - private Queue queue = null; - - private Filter filter = null; - - private EventBus eventBus; - - private PlatformManager platformManager; - - private boolean isStarted = false; - - OutgoingVertxEventHandler(String connectorName, Map<String, Object> configuration, PostOffice postOffice) { - this.connectorName = connectorName; - this.queueName = ConfigurationHelper.getStringProperty(VertxConstants.QUEUE_NAME, null, configuration); - this.postOffice = postOffice; - - this.port = ConfigurationHelper.getIntProperty(VertxConstants.PORT, 0, configuration); - this.host = ConfigurationHelper.getStringProperty(VertxConstants.HOST, "localhost", configuration); - this.quorumSize = ConfigurationHelper.getIntProperty(VertxConstants.VERTX_QUORUM_SIZE, -1, configuration); - this.haGroup = ConfigurationHelper.getStringProperty(VertxConstants.VERTX_HA_GROUP, "activemq", configuration); - this.vertxAddress = ConfigurationHelper.getStringProperty(VertxConstants.VERTX_ADDRESS, "org.apache.activemq", configuration); - this.publish = ConfigurationHelper.getBooleanProperty(VertxConstants.VERTX_PUBLISH, false, configuration); - } - - @Override - public void start() throws Exception { - if (this.isStarted) { - return; - } - System.setProperty("vertx.clusterManagerFactory", HazelcastClusterManagerFactory.class.getName()); - if (quorumSize != -1) { - platformManager = PlatformLocator.factory.createPlatformManager(port, host, quorumSize, haGroup); - } else { - platformManager = PlatformLocator.factory.createPlatformManager(port, host); - } - - eventBus = platformManager.vertx().eventBus(); - - if (this.connectorName == null || this.connectorName.trim().equals("")) { - throw new Exception("invalid connector name: " + this.connectorName); - } - - if (this.queueName == null || this.queueName.trim().equals("")) { - throw new Exception("invalid queue name: " + queueName); - } - - SimpleString name = new SimpleString(this.queueName); - Binding b = this.postOffice.getBinding(name); - if (b == null) { - throw new Exception(connectorName + ": queue " + queueName + " not found"); - } - this.queue = (Queue) b.getBindable(); - this.queue.addConsumer(this); - - this.queue.deliverAsync(); - this.isStarted = true; - - ActiveMQVertxLogger.LOGGER.debug(connectorName + ": started"); - } - - @Override - public void stop() throws Exception { - if (!this.isStarted) { - return; - } - - ActiveMQVertxLogger.LOGGER.debug(connectorName + ": receive shutdown request"); - - this.queue.removeConsumer(this); - - this.platformManager.stop(); - System.clearProperty("vertx.clusterManagerFactory"); - this.isStarted = false; - ActiveMQVertxLogger.LOGGER.debug(connectorName + ": stopped"); - } - - @Override - public boolean isStarted() { - return this.isStarted; - } - - @Override - public String getName() { - return this.connectorName; - } - - @Override - public HandleStatus handle(MessageReference ref) throws Exception { - if (filter != null && !filter.match(ref.getMessage())) { - return HandleStatus.NO_MATCH; - } - - synchronized (this) { - ref.handled(); - - ServerMessage message = ref.getMessage(); - - Object vertxMsgBody; - // extract information from message - Integer type = message.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE); - - if (type == null) { - // log a warning and default to raw bytes - ActiveMQVertxLogger.LOGGER.nonVertxMessage(message); - type = VertxConstants.TYPE_RAWBYTES; - } - - // from vertx - vertxMsgBody = extractMessageBody(message, type); - - if (vertxMsgBody == null) { - return HandleStatus.NO_MATCH; - } - - // send to bus - if (!publish) { - eventBus.send(vertxAddress, vertxMsgBody); - } else { - eventBus.publish(vertxAddress, vertxMsgBody); - } - - queue.acknowledge(ref); - - ActiveMQVertxLogger.LOGGER.debug(connectorName + ": forwarded to vertx: " + message.getMessageID()); - return HandleStatus.HANDLED; - } - } - - private Object extractMessageBody(ServerMessage message, Integer type) throws Exception { - Object vertxMsgBody = null; - ActiveMQBuffer bodyBuffer = message.getBodyBuffer(); - switch (type) { - case VertxConstants.TYPE_PING: - case VertxConstants.TYPE_STRING: - bodyBuffer.resetReaderIndex(); - vertxMsgBody = bodyBuffer.readString(); - break; - case VertxConstants.TYPE_BUFFER: - int len = bodyBuffer.readInt(); - byte[] bytes = new byte[len]; - bodyBuffer.readBytes(bytes); - vertxMsgBody = new Buffer(bytes); - break; - case VertxConstants.TYPE_BOOLEAN: - vertxMsgBody = bodyBuffer.readBoolean(); - break; - case VertxConstants.TYPE_BYTEARRAY: - int length = bodyBuffer.readInt(); - byte[] byteArray = new byte[length]; - bodyBuffer.readBytes(byteArray); - vertxMsgBody = byteArray; - break; - case VertxConstants.TYPE_BYTE: - vertxMsgBody = bodyBuffer.readByte(); - break; - case VertxConstants.TYPE_CHARACTER: - vertxMsgBody = bodyBuffer.readChar(); - break; - case VertxConstants.TYPE_DOUBLE: - vertxMsgBody = bodyBuffer.readDouble(); - break; - case VertxConstants.TYPE_FLOAT: - vertxMsgBody = bodyBuffer.readFloat(); - break; - case VertxConstants.TYPE_INT: - vertxMsgBody = bodyBuffer.readInt(); - break; - case VertxConstants.TYPE_LONG: - vertxMsgBody = bodyBuffer.readLong(); - break; - case VertxConstants.TYPE_SHORT: - vertxMsgBody = bodyBuffer.readShort(); - break; - case VertxConstants.TYPE_JSON_OBJECT: - vertxMsgBody = new JsonObject(bodyBuffer.readString()); - break; - case VertxConstants.TYPE_JSON_ARRAY: - vertxMsgBody = new JsonArray(bodyBuffer.readString()); - break; - case VertxConstants.TYPE_REPLY_FAILURE: - int failureType = bodyBuffer.readInt(); - int failureCode = bodyBuffer.readInt(); - String errMsg = bodyBuffer.readString(); - vertxMsgBody = new ReplyException(ReplyFailure.fromInt(failureType), failureCode, errMsg); - break; - case VertxConstants.TYPE_RAWBYTES: - int size = bodyBuffer.readableBytes(); - byte[] rawBytes = new byte[size]; - bodyBuffer.readBytes(rawBytes); - vertxMsgBody = rawBytes; - break; - default: - ActiveMQVertxLogger.LOGGER.invalidVertxType(type); - break; - } - return vertxMsgBody; - } - - @Override - public void proceedDeliver(MessageReference reference) throws Exception { - // no op - } - - @Override - public Filter getFilter() { - return this.filter; - } - - @Override - public String debug() { - return null; - } - - @Override - public String toManagementString() { - return null; - } - - @Override - public List<MessageReference> getDeliveringMessages() { - return null; - } - - @Override - public void disconnect() { - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a98b280f/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxConstants.java ---------------------------------------------------------------------- diff --git a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxConstants.java b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxConstants.java deleted file mode 100644 index e0d1537..0000000 --- a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxConstants.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.integration.vertx; - -import java.util.HashSet; -import java.util.Set; - -public class VertxConstants { - - // org.vertx.java.core.eventbus.impl.MessageFactory - public static final int TYPE_PING = 0; - public static final int TYPE_BUFFER = 1; - public static final int TYPE_BOOLEAN = 2; - public static final int TYPE_BYTEARRAY = 3; - public static final int TYPE_BYTE = 4; - public static final int TYPE_CHARACTER = 5; - public static final int TYPE_DOUBLE = 6; - public static final int TYPE_FLOAT = 7; - public static final int TYPE_INT = 8; - public static final int TYPE_LONG = 9; - public static final int TYPE_SHORT = 10; - public static final int TYPE_STRING = 11; - public static final int TYPE_JSON_OBJECT = 12; - public static final int TYPE_JSON_ARRAY = 13; - public static final int TYPE_REPLY_FAILURE = 100; - public static final int TYPE_RAWBYTES = 200; - - public static final String PORT = "port"; - public static final String HOST = "host"; - public static final String QUEUE_NAME = "queue"; - public static final String VERTX_ADDRESS = "vertx-address"; - public static final String VERTX_PUBLISH = "publish"; - public static final String VERTX_QUORUM_SIZE = "quorum-size"; - public static final String VERTX_HA_GROUP = "ha-group"; - - public static final Set<String> ALLOWABLE_INCOMING_CONNECTOR_KEYS; - public static final Set<String> REQUIRED_INCOMING_CONNECTOR_KEYS; - public static final Set<String> ALLOWABLE_OUTGOING_CONNECTOR_KEYS; - public static final Set<String> REQUIRED_OUTGOING_CONNECTOR_KEYS; - public static final int INITIAL_MESSAGE_BUFFER_SIZE = 50; - public static final String VERTX_MESSAGE_REPLYADDRESS = "VertxMessageReplyAddress"; - public static final String VERTX_MESSAGE_TYPE = "VertxMessageType"; - - static { - ALLOWABLE_INCOMING_CONNECTOR_KEYS = new HashSet<>(); - ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(PORT); - ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(HOST); - ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(QUEUE_NAME); - ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(VERTX_ADDRESS); - ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(VERTX_QUORUM_SIZE); - ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(VERTX_HA_GROUP); - - REQUIRED_INCOMING_CONNECTOR_KEYS = new HashSet<>(); - REQUIRED_INCOMING_CONNECTOR_KEYS.add(QUEUE_NAME); - - ALLOWABLE_OUTGOING_CONNECTOR_KEYS = new HashSet<>(); - ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(PORT); - ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(HOST); - ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(QUEUE_NAME); - ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(VERTX_ADDRESS); - ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(VERTX_PUBLISH); - ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(VERTX_QUORUM_SIZE); - ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(VERTX_HA_GROUP); - - REQUIRED_OUTGOING_CONNECTOR_KEYS = new HashSet<>(); - REQUIRED_OUTGOING_CONNECTOR_KEYS.add(QUEUE_NAME); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a98b280f/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxIncomingConnectorServiceFactory.java ---------------------------------------------------------------------- diff --git a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxIncomingConnectorServiceFactory.java b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxIncomingConnectorServiceFactory.java deleted file mode 100644 index 03afe20..0000000 --- a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxIncomingConnectorServiceFactory.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.integration.vertx; - -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; - -import org.apache.activemq.artemis.core.persistence.StorageManager; -import org.apache.activemq.artemis.core.postoffice.PostOffice; -import org.apache.activemq.artemis.core.server.ConnectorService; -import org.apache.activemq.artemis.core.server.ConnectorServiceFactory; - -public class VertxIncomingConnectorServiceFactory implements ConnectorServiceFactory { - - @Override - public ConnectorService createConnectorService(String connectorName, - Map<String, Object> configuration, - StorageManager storageManager, - PostOffice postOffice, - ScheduledExecutorService scheduledThreadPool) { - - return new IncomingVertxEventHandler(connectorName, configuration, storageManager, postOffice); - - } - - @Override - public Set<String> getAllowableProperties() { - return VertxConstants.ALLOWABLE_INCOMING_CONNECTOR_KEYS; - } - - @Override - public Set<String> getRequiredProperties() { - return VertxConstants.REQUIRED_INCOMING_CONNECTOR_KEYS; - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a98b280f/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxOutgoingConnectorServiceFactory.java ---------------------------------------------------------------------- diff --git a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxOutgoingConnectorServiceFactory.java b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxOutgoingConnectorServiceFactory.java deleted file mode 100644 index 2ae0848..0000000 --- a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxOutgoingConnectorServiceFactory.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.integration.vertx; - -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; - -import org.apache.activemq.artemis.core.persistence.StorageManager; -import org.apache.activemq.artemis.core.postoffice.PostOffice; -import org.apache.activemq.artemis.core.server.ConnectorService; -import org.apache.activemq.artemis.core.server.ConnectorServiceFactory; - -public class VertxOutgoingConnectorServiceFactory implements ConnectorServiceFactory { - - @Override - public ConnectorService createConnectorService(String connectorName, - Map<String, Object> configuration, - StorageManager storageManager, - PostOffice postOffice, - ScheduledExecutorService scheduledThreadPool) { - return new OutgoingVertxEventHandler(connectorName, configuration, postOffice); - } - - @Override - public Set<String> getAllowableProperties() { - return VertxConstants.ALLOWABLE_OUTGOING_CONNECTOR_KEYS; - } - - @Override - public Set<String> getRequiredProperties() { - return VertxConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS; - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a98b280f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 454aad2..d182bbb 100644 --- a/pom.xml +++ b/pom.xml @@ -54,7 +54,6 @@ <module>artemis-server-osgi</module> <module>integration/activemq-spring-integration</module> <module>integration/activemq-aerogear-integration</module> - <module>integration/activemq-vertx-integration</module> <module>artemis-distribution</module> <module>artemis-tools</module> <module>tests</module> @@ -736,7 +735,6 @@ <module>artemis-jdbc-store</module> <module>integration/activemq-spring-integration</module> <module>integration/activemq-aerogear-integration</module> - <module>integration/activemq-vertx-integration</module> <module>tests</module> </modules> <properties> @@ -772,7 +770,6 @@ <module>artemis-maven-plugin</module> <module>integration/activemq-spring-integration</module> <module>integration/activemq-aerogear-integration</module> - <module>integration/activemq-vertx-integration</module> <module>examples</module> <module>tests</module> <module>artemis-distribution</module> @@ -831,7 +828,6 @@ <module>artemis-maven-plugin</module> <module>integration/activemq-spring-integration</module> <module>integration/activemq-aerogear-integration</module> - <module>integration/activemq-vertx-integration</module> <module>tests</module> </modules> <properties> @@ -874,7 +870,6 @@ <module>artemis-maven-plugin</module> <module>integration/activemq-spring-integration</module> <module>integration/activemq-aerogear-integration</module> - <module>integration/activemq-vertx-integration</module> <module>tests</module> </modules> <properties> @@ -909,7 +904,6 @@ <module>artemis-maven-plugin</module> <module>integration/activemq-spring-integration</module> <module>integration/activemq-aerogear-integration</module> - <module>integration/activemq-vertx-integration</module> <module>tests</module> <module>examples</module> </modules> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a98b280f/tests/integration-tests/pom.xml ---------------------------------------------------------------------- diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml index 8dd8b16..afdaee2 100644 --- a/tests/integration-tests/pom.xml +++ b/tests/integration-tests/pom.xml @@ -30,8 +30,6 @@ <activemq.basedir>${project.basedir}/../..</activemq.basedir> <karaf.version>4.0.6</karaf.version> <pax.exam.version>4.9.1</pax.exam.version> - <vertx.version>2.1.6</vertx.version> - <vertx.testtools.version>2.0.3-final</vertx.testtools.version> </properties> <repositories> @@ -123,11 +121,6 @@ </dependency> <dependency> <groupId>org.apache.activemq</groupId> - <artifactId>artemis-vertx-integration</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.activemq</groupId> <artifactId>artemis-journal</artifactId> <version>${project.version}</version> </dependency> @@ -248,37 +241,6 @@ <scope>test</scope> </dependency> - <!--Vertx provided dependencies--> - <dependency> - <groupId>io.vertx</groupId> - <artifactId>vertx-core</artifactId> - <version>${vertx.version}</version> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>io.vertx</groupId> - <artifactId>vertx-platform</artifactId> - <version>${vertx.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>io.vertx</groupId> - <artifactId>vertx-hazelcast</artifactId> - <version>${vertx.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>io.vertx</groupId> - <artifactId>testtools</artifactId> - <version>${vertx.testtools.version}</version> - <scope>test</scope> - </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a98b280f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index 7a12f42..91db1d2a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.MQTTException; @@ -57,7 +58,6 @@ import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.vertx.java.core.impl.ConcurrentHashSet; /** * QT
