This is an automated email from the ASF dual-hosted git repository. jgallimore pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tomee-chatterbox.git
commit ccb9450089f0d3c6dbf39e8455c53e6eb7271fe1 Author: Jonathan Gallimore <[email protected]> AuthorDate: Wed Jun 9 11:44:40 2021 +0100 WIP --- chatterbox-nats/chatterbox-nats-api/pom.xml | 6 ++ .../tomee/chatterbox/nats/api/InboundListener.java | 4 +- .../tomee/chatterbox/nats/api/NATSConnection.java | 2 +- .../tomee/chatterbox/nats/api/NATSException.java | 18 ++++ .../tomee/chatterbox/nats/api/NATSMessage.java | 23 ----- chatterbox-nats/chatterbox-nats-impl/pom.xml | 1 + .../nats/adapter/NATSActivationSpec.java | 9 ++ .../nats/adapter/NATSResourceAdapter.java | 101 +++++++++++++++++---- .../nats/adapter/out/NATSConnectionImpl.java | 5 +- .../nats/adapter/out/NATSManagedConnection.java | 5 +- .../src/main/rar/META-INF/ra.xml | 14 +-- chatterbox-nats/chatterbox-nats-sample-war/pom.xml | 9 +- .../superbiz/{SystemBean.java => EchoBean.java} | 22 ++++- .../src/main/java/org/superbiz/Sender.java | 11 ++- 14 files changed, 166 insertions(+), 64 deletions(-) diff --git a/chatterbox-nats/chatterbox-nats-api/pom.xml b/chatterbox-nats/chatterbox-nats-api/pom.xml index 6f4ca62..db16e68 100644 --- a/chatterbox-nats/chatterbox-nats-api/pom.xml +++ b/chatterbox-nats/chatterbox-nats-api/pom.xml @@ -45,5 +45,11 @@ <artifactId>javaee-api</artifactId> <scope>provided</scope> </dependency> + <dependency> + <groupId>io.nats</groupId> + <artifactId>java-nats-streaming</artifactId> + <version>2.2.3</version> + <scope>provided</scope> + </dependency> </dependencies> </project> diff --git a/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/InboundListener.java b/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/InboundListener.java index 5f7ea6c..94d6294 100644 --- a/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/InboundListener.java +++ b/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/InboundListener.java @@ -16,11 +16,13 @@ */ package org.apache.tomee.chatterbox.nats.api; +import io.nats.streaming.Message; + /** * @version $Revision$ $Date$ */ public interface InboundListener { - public void onMessage(final NATSMessage message) throws NATSException; + public void onMessage(final Message message) throws NATSException; } diff --git a/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSConnection.java b/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSConnection.java index e323dd1..8b39874 100755 --- a/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSConnection.java +++ b/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSConnection.java @@ -20,7 +20,7 @@ package org.apache.tomee.chatterbox.nats.api; public interface NATSConnection { - public void sendMessage(final String channel, final String message); + public void publish(final String subject, final byte[] data) throws NATSException; public void close(); } diff --git a/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSException.java b/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSException.java index 031ad42..b761cd3 100644 --- a/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSException.java +++ b/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSException.java @@ -20,4 +20,22 @@ package org.apache.tomee.chatterbox.nats.api; * @version $Revision$ $Date$ */ public class NATSException extends Exception { + public NATSException() { + } + + public NATSException(String message) { + super(message); + } + + public NATSException(String message, Throwable cause) { + super(message, cause); + } + + public NATSException(Throwable cause) { + super(cause); + } + + public NATSException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } } diff --git a/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSMessage.java b/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSMessage.java deleted file mode 100644 index afb7718..0000000 --- a/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSMessage.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tomee.chatterbox.nats.api; - -/** - * @version $Revision$ $Date$ - */ -public class NATSMessage { -} diff --git a/chatterbox-nats/chatterbox-nats-impl/pom.xml b/chatterbox-nats/chatterbox-nats-impl/pom.xml index fbe5f30..d46e53a 100644 --- a/chatterbox-nats/chatterbox-nats-impl/pom.xml +++ b/chatterbox-nats/chatterbox-nats-impl/pom.xml @@ -59,6 +59,7 @@ <groupId>io.nats</groupId> <artifactId>java-nats-streaming</artifactId> <version>2.2.3</version> + <scope>provided</scope> </dependency> </dependencies> </project> diff --git a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSActivationSpec.java b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSActivationSpec.java index 4b62580..1a2fcc4 100644 --- a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSActivationSpec.java +++ b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSActivationSpec.java @@ -29,6 +29,7 @@ public class NATSActivationSpec implements ActivationSpec { private ResourceAdapter resourceAdapter; private Class beanClass; + private String subject; public Class getBeanClass() { return beanClass; @@ -38,6 +39,14 @@ public class NATSActivationSpec implements ActivationSpec { this.beanClass = beanClass; } + public String getSubject() { + return subject; + } + + public void setSubject(String subject) { + this.subject = subject; + } + @Override public void validate() throws InvalidPropertyException { } diff --git a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSResourceAdapter.java b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSResourceAdapter.java index 54d3a62..804a24f 100644 --- a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSResourceAdapter.java +++ b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSResourceAdapter.java @@ -16,6 +16,15 @@ */ package org.apache.tomee.chatterbox.nats.adapter; +import io.nats.streaming.Message; +import io.nats.streaming.MessageHandler; +import io.nats.streaming.Options; +import io.nats.streaming.StreamingConnection; +import io.nats.streaming.StreamingConnectionFactory; +import io.nats.streaming.Subscription; +import org.apache.tomee.chatterbox.nats.api.InboundListener; +import org.apache.tomee.chatterbox.nats.api.NATSException; + import javax.resource.ResourceException; import javax.resource.spi.ActivationSpec; import javax.resource.spi.BootstrapContext; @@ -28,6 +37,8 @@ import javax.resource.spi.endpoint.MessageEndpointFactory; import javax.resource.spi.work.Work; import javax.resource.spi.work.WorkManager; import javax.transaction.xa.XAResource; +import java.io.IOException; +import java.lang.reflect.Method; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.lang.IllegalStateException; @@ -36,20 +47,43 @@ import java.lang.IllegalStateException; public class NATSResourceAdapter implements ResourceAdapter { final Map<NATSActivationSpec, EndpointTarget> targets = new ConcurrentHashMap<NATSActivationSpec, EndpointTarget>(); + private static final Method ONMESSAGE; + + static { + try { + ONMESSAGE = InboundListener.class.getMethod("onMessage", Message.class); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + @ConfigProperty - private String token; + private String baseAddress; private WorkManager workManager; - private String user; - private String userId; + private StreamingConnectionFactory cf; + private StreamingConnection connection; public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException { workManager = bootstrapContext.getWorkManager(); - // connect to NATS + + try { + cf = new + StreamingConnectionFactory(new Options.Builder().natsUrl(baseAddress) + .clusterId("cluster-id").clientId("client-id").build()); + + connection = cf.createConnection(); + } catch (Throwable t) { + // TODO: log this + } } public void stop() { - // disconnect + try { + connection.close(); + } catch (Throwable t) { + // TODO: log this + } } public void endpointActivation(final MessageEndpointFactory messageEndpointFactory, final ActivationSpec activationSpec) @@ -64,11 +98,10 @@ public class NATSResourceAdapter implements ResourceAdapter { final MessageEndpoint messageEndpoint = messageEndpointFactory.createEndpoint(null); final EndpointTarget target = new EndpointTarget(messageEndpoint); - final Class<?> endpointClass = NATSActivationSpec.getBeanClass() != null ? NATSActivationSpec - .getBeanClass() : messageEndpointFactory.getEndpointClass(); - - targets.put(NATSActivationSpec, target); + + final Subscription subscription = connection.subscribe(((NATSActivationSpec) activationSpec).getSubject(), target); + target.setSubscription(subscription); } catch (Exception e) { e.printStackTrace(); } @@ -83,15 +116,14 @@ public class NATSResourceAdapter implements ResourceAdapter { } public void endpointDeactivation(MessageEndpointFactory messageEndpointFactory, ActivationSpec activationSpec) { - final NATSActivationSpec telnetActivationSpec = (NATSActivationSpec) activationSpec; + final NATSActivationSpec natsActivationSpec = (NATSActivationSpec) activationSpec; - final EndpointTarget endpointTarget = targets.get(telnetActivationSpec); + final EndpointTarget endpointTarget = targets.get(natsActivationSpec); if (endpointTarget == null) { throw new IllegalStateException("No EndpointTarget to undeploy for ActivationSpec " + activationSpec); } - // unsubscribe - + endpointTarget.close(); endpointTarget.messageEndpoint.release(); } @@ -99,17 +131,54 @@ public class NATSResourceAdapter implements ResourceAdapter { return new XAResource[0]; } - public void sendMessage(final String channel, final String message) { + public void publish(final String subject, final byte[] data) throws NATSException { // publish a message + try { + connection.publish(subject, data); + } catch (Exception e) { + throw new NATSException(e); + } } - private static class EndpointTarget { + private static class EndpointTarget implements MessageHandler { private final MessageEndpoint messageEndpoint; + private Subscription subscription; - public EndpointTarget(MessageEndpoint messageEndpoint) { + public EndpointTarget(final MessageEndpoint messageEndpoint) { this.messageEndpoint = messageEndpoint; } + @Override + public void onMessage(final Message msg) { + try { + try { + messageEndpoint.beforeDelivery(ONMESSAGE); + ((InboundListener) messageEndpoint).onMessage(msg); + } finally { + messageEndpoint.afterDelivery(); + } + } catch (Throwable t) { + // TODO: log this + } + } + + public void setSubscription(final Subscription subscription) { + this.subscription = subscription; + } + + public Subscription getSubscription() { + return subscription; + } + + public void close() { + try { + if (subscription != null) { + subscription.close(true); + } + } catch (IOException e) { + // TODO: log this + } + } } } diff --git a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/out/NATSConnectionImpl.java b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/out/NATSConnectionImpl.java index 8b375f7..d743b9f 100755 --- a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/out/NATSConnectionImpl.java +++ b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/out/NATSConnectionImpl.java @@ -19,6 +19,7 @@ package org.apache.tomee.chatterbox.nats.adapter.out; import org.apache.tomee.chatterbox.nats.api.NATSConnection; +import org.apache.tomee.chatterbox.nats.api.NATSException; import java.util.logging.Logger; @@ -34,8 +35,8 @@ public class NATSConnectionImpl implements NATSConnection { this.mcf = mcf; } - public void sendMessage(final String channel, final String message) { - mc.sendMessage(channel, message); + public void publish(final String subject, final byte[] data) throws NATSException { + mc.publish(subject, data); } public void close() { diff --git a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/out/NATSManagedConnection.java b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/out/NATSManagedConnection.java index 0999036..d7043b2 100755 --- a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/out/NATSManagedConnection.java +++ b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/out/NATSManagedConnection.java @@ -20,6 +20,7 @@ package org.apache.tomee.chatterbox.nats.adapter.out; import org.apache.tomee.chatterbox.nats.adapter.NATSResourceAdapter; import org.apache.tomee.chatterbox.nats.api.NATSConnection; +import org.apache.tomee.chatterbox.nats.api.NATSException; import javax.resource.NotSupportedException; import javax.resource.ResourceException; @@ -131,10 +132,10 @@ public class NATSManagedConnection implements ManagedConnection { return new NATSManagedConnectionMetaData(); } - void sendMessage(final String channel, final String message) { + void publish(final String subject, final byte[] data) throws NATSException { log.finest("sendMessage()"); final NATSResourceAdapter resourceAdapter = (NATSResourceAdapter) mcf.getResourceAdapter(); - resourceAdapter.sendMessage(channel, message); + resourceAdapter.publish(subject, data); } } diff --git a/chatterbox-nats/chatterbox-nats-rar/src/main/rar/META-INF/ra.xml b/chatterbox-nats/chatterbox-nats-rar/src/main/rar/META-INF/ra.xml index c44905f..3d4ed8e 100644 --- a/chatterbox-nats/chatterbox-nats-rar/src/main/rar/META-INF/ra.xml +++ b/chatterbox-nats/chatterbox-nats-rar/src/main/rar/META-INF/ra.xml @@ -22,9 +22,9 @@ xmlns="http://xmlns.jcp.org/xml/ns/javaee" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/connector_1_7.xsd" version="1.7"> - <description>Chatterbox Slack Connector</description> - <display-name>Chatterbox Slack Connector</display-name> - <eis-type>Slack Connector</eis-type> + <description>Chatterbox NATS Connector</description> + <display-name>Chatterbox NATS Connector</display-name> + <eis-type>NATS Connector</eis-type> <resourceadapter-version>1.0</resourceadapter-version> <license> <license-required>false</license-required> @@ -32,18 +32,18 @@ <resourceadapter> <resourceadapter-class>org.apache.tomee.chatterbox.nats.adapter.NATSResourceAdapter</resourceadapter-class> <config-property> - <config-property-name>token</config-property-name> + <config-property-name>baseAddress</config-property-name> <config-property-type>String</config-property-type> </config-property> <outbound-resourceadapter> <connection-definition> <managedconnectionfactory-class>org.apache.tomee.chatterbox.nats.adapter.out.NATSManagedConnectionFactory </managedconnectionfactory-class> - <connectionfactory-interface>SlackConnectionFactory + <connectionfactory-interface>org.apache.tomee.chatterbox.nats.api.NATSConnectionFactory </connectionfactory-interface> <connectionfactory-impl-class>org.apache.tomee.chatterbox.nats.adapter.out.NATSConnectionFactoryImpl </connectionfactory-impl-class> - <connection-interface>SlackConnection</connection-interface> + <connection-interface>org.apache.tomee.chatterbox.nats.api.NATSConnection</connection-interface> <connection-impl-class>org.apache.tomee.chatterbox.nats.adapter.out.NATSConnectionImpl</connection-impl-class> </connection-definition> <transaction-support>NoTransaction</transaction-support> @@ -52,7 +52,7 @@ <inbound-resourceadapter> <messageadapter> <messagelistener> - <messagelistener-type>InboundListener</messagelistener-type> + <messagelistener-type>org.apache.tomee.chatterbox.nats.api.InboundListener</messagelistener-type> <activationspec> <activationspec-class>org.apache.tomee.chatterbox.nats.adapter.NATSActivationSpec</activationspec-class> </activationspec> diff --git a/chatterbox-nats/chatterbox-nats-sample-war/pom.xml b/chatterbox-nats/chatterbox-nats-sample-war/pom.xml index aae7a99..25827a3 100644 --- a/chatterbox-nats/chatterbox-nats-sample-war/pom.xml +++ b/chatterbox-nats/chatterbox-nats-sample-war/pom.xml @@ -40,6 +40,12 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>io.nats</groupId> + <artifactId>java-nats-streaming</artifactId> + <version>2.2.3</version> + <scope>provided</scope> + </dependency> + <dependency> <groupId>${project.groupId}</groupId> <artifactId>chatterbox-nats-impl</artifactId> <version>${project.version}</version> @@ -89,8 +95,7 @@ <tomeeClassifier>plus</tomeeClassifier> <libs> <lib>org.apache.tomee.chatterbox:chatterbox-nats-api:${project.version}:jar</lib> - <lib>org.tomitribe:tomitribe-util:1.1.0:jar</lib> - <lib>org.tomitribe:tomitribe-crest-api:0.3:jar</lib> + <lib>io.nats:java-nats-streaming:2.2.3:jar</lib> </libs> <apps> <app>org.apache.tomee.chatterbox:chatterbox-nats-rar:${project.version}:rar</app> diff --git a/chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/SystemBean.java b/chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/EchoBean.java similarity index 60% rename from chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/SystemBean.java rename to chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/EchoBean.java index 623b3f4..2c0e81a 100644 --- a/chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/SystemBean.java +++ b/chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/EchoBean.java @@ -18,16 +18,28 @@ package org.superbiz; import org.apache.tomee.chatterbox.nats.api.InboundListener; import org.apache.tomee.chatterbox.nats.api.NATSException; -import org.apache.tomee.chatterbox.nats.api.NATSMessage; +import io.nats.streaming.Message; +import javax.ejb.ActivationConfigProperty; import javax.ejb.MessageDriven; +import java.io.IOException; +import java.nio.charset.StandardCharsets; -@MessageDriven(name = "Receiver") -public class SystemBean implements InboundListener { +@MessageDriven(name = "Echo", activationConfig = { + @ActivationConfigProperty(propertyName = "subject", propertyValue = "echo") +}) +public class EchoBean implements InboundListener { @Override - public void onMessage(NATSMessage message) throws NATSException { - // TODO: fill in implementation here + public void onMessage(final Message message) throws NATSException { + try { + final String text = new String(message.getData(), StandardCharsets.UTF_8); + System.out.println(text); + + message.ack(); + } catch (IOException e) { + throw new NATSException(e); + } } } diff --git a/chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/Sender.java b/chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/Sender.java index 7ea3778..fa50cbc 100644 --- a/chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/Sender.java +++ b/chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/Sender.java @@ -29,6 +29,7 @@ import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.core.MediaType; +import java.nio.charset.StandardCharsets; @Singleton @Lock(LockType.READ) @@ -38,16 +39,16 @@ public class Sender { @Resource private NATSConnectionFactory cf; - @Path("{channel}") + @Path("{subject}") @POST @Consumes(MediaType.TEXT_PLAIN) - public void sendMessage(@PathParam("channel") final String channel, final String message) { + public void sendMessage(@PathParam("subject") final String subject, final String message) { try { final NATSConnection connection = cf.getConnection(); - connection.sendMessage(channel, message); + connection.publish(subject, message.getBytes(StandardCharsets.UTF_8)); connection.close(); - } catch (ResourceException e) { - // ignore + } catch (Throwable t) { + // TODO: log this } }
