Sorry, tied up with a couple of things here. Literally just opened the IDE to finish this off. :)
Jon On Wed, Jun 16, 2021 at 9:46 AM Matthew Broadhead <matthew.broadh...@nbmlaw.co.uk.invalid> wrote: > hi Jon, > > Is there anything else you need from my end? It feels like it is nearly > finished. Could it pass the parameters in a similar way to how it is > done in the chatterbox-imap? sending them in from the tomee.xml? > > > On 10/06/2021 19:07, Jonathan Gallimore wrote: > > Thank you! That worked. I have pushed an update to my code, and I've been > > able to send a message to NATS from a REST endpoint, and receive a > message > > from NATS via an MDB. > > > > I still need to extract the cluster ID and client ID into properties for > > the resource adapter, and of course, try and write up how this works. > Code > > is here: https://github.com/jgallimore/tomee-chatterbox/tree/nats, but > I'll > > merge it in once I have done these couple of changes. > > > > One other thing on my mind is that I'd like to try and find some way to > > make all this easier. If you've used JMS, you've used JCA, possibly > without > > realizing it, but the spec still feels very hard to get into - I wonder > if > > there is anything we can propose in that regard to try and make creating > > simple connectors a bit easier. > > > > Jon > > > > On Wed, Jun 9, 2021 at 4:31 PM Matthew Broadhead > > <matthew.broadh...@nbmlaw.co.uk.invalid> wrote: > > > >> hi, it was my fault putting a confusing value in the docker compose > >> file. it should work like this > >> > >> StreamingConnectionFactory cf = new > >> StreamingConnectionFactory(new > >> Options.Builder().natsUrl("nats://localhost:4222") > >> .clusterId("yourclientid").clientId("anything").build()); > >> > >> but you could change the docker-compose.yml to have a -cid of > >> yourclusterid and then do this > >> > >> StreamingConnectionFactory cf = new > >> StreamingConnectionFactory(new > >> Options.Builder().natsUrl("nats://localhost:4222") > >> .clusterId("yourclusterid").clientId("yourclientid").build()); > >> > >> On 09/06/2021 17:15, Jonathan Gallimore wrote: > >>> Thanks. If I can get that test going, I can probably get the rest > >> working. > >>> I suspect there are some other bugs in there. > >>> > >>> Jon > >>> > >>> On Wed, Jun 9, 2021 at 4:04 PM Matthew Broadhead > >>> <matthew.broadh...@nbmlaw.co.uk.invalid> wrote: > >>> > >>>> Thanks I will have a look now > >>>> > >>>> On 09/06/2021 15:19, Jonathan Gallimore wrote: > >>>>> Ok, I'm stuck. If I boot up a NATS server with your > docker-compose.yml > >>>>> file, and run the following test: > >>>>> > >>>>> @Test > >>>>> public void testShouldConnect() throws Exception { > >>>>> StreamingConnectionFactory cf = new > >>>>> StreamingConnectionFactory(new > >>>>> Options.Builder().natsUrl("nats://localhost:4222") > >>>>> > >>>> .clusterId("cluster-id").clientId("yourclientid").build()); > >>>>> final StreamingConnection connection = > cf.createConnection(); > >>>>> Assert.assertNotNull(connection); > >>>>> > >>>>> connection.close(); > >>>>> } > >>>>> > >>>>> It fails with a timeout. > >>>>> > >>>>> I monitored the connection with wireshark, and see the following > >>>>> > >>>>> < = from server to client > >>>>>> = from client to server > >>>>> <INFO > >>>>> > >> > <{"server_id":"NDMRYDSGUSH2QR6SZWMFB44ND5CODXGKNYTQ5IPLUGYUDBI6G54CIGF6","server_name":"NDMRYDSGUSH2QR6SZWMFB44ND5CODXGKNYTQ5IPLUGYUDBI6G54CIGF6","version":"2.1.4","proto":1,"git_commit":"fb009af","go":"go1.13.7","host":"0.0.0.0","port":4222,"max_payload":1048576,"client_id":10} > >>>>>> CONNECT > >> > {"lang":"java","version":"2.6.5","name":"yourclientid","protocol":1,"verbose":false,"pedantic":false,"tls_required":false,"echo":true} > >>>>>> PING > >>>>> <PONG > >>>>>> SUB _STAN.acks.LP4bdY88abuVJ19Qo5HVuk 1 > >>>>>> SUB _INBOX.LP4bdY88abuVJ19Qo5HVn8 2 > >>>>>> SUB _INBOX.LP4bdY88abuVJ19Qo5HVqw 3 > >>>>>> SUB _INBOX.F0vVy1N0sQM3xseeEWMIAL.* 4 > >>>>>> PUB _STAN.discover.cluster-id > >>>>> _INBOX.F0vVy1N0sQM3xseeEWMIAL.F0vVy1N0sQM3xseeEWMISH 75 > >> > .yourclientid.._INBOX.LP4bdY88abuVJ19Qo5HVn8..".LP4bdY88abuVJ19Qo5HVjK(.0. > >>>>> <PING > >>>>>> PONG > >>>>>> UNSUB 1 > >>>>>> UNSUB 2 > >>>>>> UNSUB 3 > >>>>> So there does appear to be some communication between my test and the > >>>> NATS > >>>>> server - I have no idea why it times out. > >>>>> > >>>>> My code is here if you want to have a go: > >>>>> https://github.com/jgallimore/tomee-chatterbox/tree/nats > >>>>> > >>>>> Jon > >>>>> > >>>>> On Wed, Jun 9, 2021 at 11:56 AM Jonathan Gallimore < > >>>>> jonathan.gallim...@gmail.com> wrote: > >>>>> > >>>>>> Nervermind, I figured out my mistake. I'll post back when I have > >>>> something > >>>>>> going. > >>>>>> > >>>>>> Jon > >>>>>> > >>>>>> On Wed, Jun 9, 2021 at 11:44 AM Jonathan Gallimore < > >>>>>> jonathan.gallim...@gmail.com> wrote: > >>>>>> > >>>>>>> I think I have something wired up, but when executing this: > >>>>>>> > >>>>>>> cf = new > >>>>>>> StreamingConnectionFactory(new > >>>>>>> Options.Builder().natsUrl(baseAddress) > >>>>>>> > >>>>>>> .clusterId("cluster-id").clientId("client-id").build()); > >>>>>>> > >>>>>>> connection = cf.createConnection(); > >>>>>>> > >>>>>>> > >>>>>>> connection is null. Any pointers? > >>>>>>> > >>>>>>> Jon > >>>>>>> > >>>>>>> On Wed, Jun 9, 2021 at 8:16 AM Matthew Broadhead > >>>>>>> <matthew.broadh...@nbmlaw.co.uk.invalid> wrote: > >>>>>>> > >>>>>>>> i have never used a JCA adapter before. is it loaded in using the > >>>>>>>> tomee.xml as a Resource? and then injected into a singleton for > >>>>>>>> subscribing to messages? > >>>>>>>> > >>>>>>>> On 08/06/2021 17:15, Jonathan Gallimore wrote: > >>>>>>>>> Definitely sounds like a good case for a JCA adapter. I'll take a > >>>> quick > >>>>>>>>> swing at hooking up an example for you. > >>>>>>>>> > >>>>>>>>> Jon > >>>>>>>>> > >>>>>>>>> On Tue, Jun 8, 2021 at 9:02 AM Matthew Broadhead > >>>>>>>>> <matthew.broadh...@nbmlaw.co.uk.invalid> wrote: > >>>>>>>>> > >>>>>>>>>> Hi Jon, > >>>>>>>>>> > >>>>>>>>>> NATS is basically a message queue, like ActiveMQ I suppose. > >>>>>>>>>> > >>>>>>>>>> I included the adapter into the project using maven > >>>>>>>>>> <dependency> > >>>>>>>>>> <groupId>io.nats</groupId> > >>>>>>>>>> <artifactId>java-nats-streaming</artifactId> > >>>>>>>>>> <version>2.2.3</version> > >>>>>>>>>> </dependency> > >>>>>>>>>> > >>>>>>>>>> i started up a nats server using docker. here is my > >>>>>>>> docker-compose.yml > >>>>>>>>>> version: '3.1' > >>>>>>>>>> services: > >>>>>>>>>> nats-docker: > >>>>>>>>>> image: nats-streaming:0.17.0 > >>>>>>>>>> restart: always > >>>>>>>>>> command: > >>>>>>>>>> - '-p' > >>>>>>>>>> - '4222' > >>>>>>>>>> - '-m' > >>>>>>>>>> - '8222' > >>>>>>>>>> - '-hbi' > >>>>>>>>>> - '5s' > >>>>>>>>>> - '-hbt' > >>>>>>>>>> - '5s' > >>>>>>>>>> - '-hbf' > >>>>>>>>>> - '2' > >>>>>>>>>> - '-SD' > >>>>>>>>>> - '-cid' > >>>>>>>>>> - 'yourclientid' > >>>>>>>>>> environment: > >>>>>>>>>> TZ: Europe/London > >>>>>>>>>> LANG: en_GB.UTF-8 > >>>>>>>>>> LANGUAGE: en_GB:en > >>>>>>>>>> LC_ALL: en_GB.UTF-8 > >>>>>>>>>> ports: > >>>>>>>>>> - '4222:4222' > >>>>>>>>>> - '8222:8222' > >>>>>>>>>> expose: > >>>>>>>>>> - 4222 > >>>>>>>>>> - 8222 > >>>>>>>>>> networks: > >>>>>>>>>> - backend > >>>>>>>>>> networks: > >>>>>>>>>> backend: > >>>>>>>>>> driver: bridge > >>>>>>>>>> > >>>>>>>>>> JCA sounds good if it solves the threading issue. it is very > kind > >>>> of > >>>>>>>>>> you to offer to help write an adapter. looking at the code you > >> sent > >>>>>>>> it > >>>>>>>>>> looks complicated but i can have a stab at it if you don't have > >> much > >>>>>>>> time > >>>>>>>>>> let me know if you need more info > >>>>>>>>>> > >>>>>>>>>> Matthew > >>>>>>>>>> > >>>>>>>>>> On 07/06/2021 17:48, Jonathan Gallimore wrote: > >>>>>>>>>>> At the risk of sounding a bit ignorant... what is NATS? > >>>>>>>>>>> > >>>>>>>>>>> From what I can tell, it sounds like you're receiving a > >> stream > >>>> of > >>>>>>>> events > >>>>>>>>>>> (over websocket) and want to do some processing in an EJB or > CDI > >>>>>>>> bean for > >>>>>>>>>>> each event. The connection to the NATS server isn't in the > >> context > >>>>>>>> of a > >>>>>>>>>>> HTTP (or any other type of) request, and just runs all the time > >>>>>>>> while the > >>>>>>>>>>> server is running - does that sound about right? > >>>>>>>>>>> > >>>>>>>>>>> Assuming that sounds right, it sounds a bit like the Slack JCA > >>>>>>>> connector > >>>>>>>>>> I > >>>>>>>>>>> wrote a while back: > >>>>>>>>>>> > >>>> > https://github.com/apache/tomee-chatterbox/tree/master/chatterbox-slack > >> . > >>>>>>>>>>> Essentially, the resource adapter connects to slack and runs > all > >>>> the > >>>>>>>>>> time. > >>>>>>>>>>> Messages that come into the server from slack are processed in > >> MDBs > >>>>>>>> that > >>>>>>>>>>> implement the InboundListener interface. > >>>>>>>>>>> > >>>>>>>>>>> JCA certainly feels complex, especially when compared with your > >>>>>>>>>>> Singleton @Startup bean approach, but I usually find that if I > >> try > >>>>>>>> and > >>>>>>>>>> work > >>>>>>>>>>> with threads in EJBs, things usually go in the wrong direction. > >>>>>>>>>> Conversely, > >>>>>>>>>>> JCA even gives you a work manager to potentially handle that > >> stuff. > >>>>>>>>>>> If you can give me some pointers to running a NATS server, I'd > be > >>>>>>>> happy > >>>>>>>>>> to > >>>>>>>>>>> help with a sample adapter and application. > >>>>>>>>>>> > >>>>>>>>>>> Jon > >>>>>>>>>>> > >>>>>>>>>>> On Mon, Jun 7, 2021 at 11:49 AM Matthew Broadhead > >>>>>>>>>>> <matthew.broadh...@nbmlaw.co.uk.invalid> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> I am trying to subscribe to a NATS streaming server with > >>>>>>>>>>>> https://github.com/nats-io/stan.java which is > >>>>>>>> java.lang.Autocloseable. > >>>>>>>>>>>> At first it wasn't closing properly as seen in my original > gist: > >>>>>>>>>>>> > >> https://gist.github.com/chongma/2a3ab451f2aeabc98340a9b897394cfe > >>>>>>>>>>>> This was solved with this > >>>>>>>>>>>> > >>>>>>>>>>>> > >> > https://stackoverflow.com/questions/39080296/hazelcast-threads-prevent-tomee-from-stopping > >>>>>>>>>>>> creating a default producer: > >>>>>>>>>>>> @ApplicationScoped > >>>>>>>>>>>> public class NatsConnectionProducer { > >>>>>>>>>>>> > >>>>>>>>>>>> @Resource(name = "baseAddressNats") > >>>>>>>>>>>> private String baseAddressNats; > >>>>>>>>>>>> > >>>>>>>>>>>> @Produces > >>>>>>>>>>>> @ApplicationScoped > >>>>>>>>>>>> public StreamingConnection instance() throws > >> IOException, > >>>>>>>>>>>> InterruptedException { > >>>>>>>>>>>> StreamingConnectionFactory cf = new > >>>>>>>>>>>> StreamingConnectionFactory(new > >>>>>>>>>> Options.Builder().natsUrl(baseAddressNats) > >>>>>>>>>>>> .clusterId("cluster-id").clientId("client-id").build()); > >>>>>>>>>>>> return cf.createConnection(); > >>>>>>>>>>>> } > >>>>>>>>>>>> > >>>>>>>>>>>> public void destroy(@Disposes final > StreamingConnection > >>>>>>>> instance) > >>>>>>>>>>>> throws IOException, TimeoutException, > >>>>>>>>>> InterruptedException { > >>>>>>>>>>>> instance.close(); > >>>>>>>>>>>> } > >>>>>>>>>>>> } > >>>>>>>>>>>> > >>>>>>>>>>>> But now i am creating a new thread because any injections with > >> JPA > >>>>>>>> had > >>>>>>>>>>>> cacheing issues and this seems to work but i am not sure it is > >>>>>>>>>>>> broadcasting to websockets correctly > >>>>>>>>>>>> @Singleton > >>>>>>>>>>>> @Lock(LockType.READ) > >>>>>>>>>>>> @Startup > >>>>>>>>>>>> public class SchedulerEvents { > >>>>>>>>>>>> private static final Logger log = > >>>>>>>>>>>> Logger.getLogger(SchedulerEvents.class.getName()); > >>>>>>>>>>>> > >>>>>>>>>>>> @Inject > >>>>>>>>>>>> private StreamingConnection streamingConnection; > >>>>>>>>>>>> > >>>>>>>>>>>> @Inject > >>>>>>>>>>>> private SomeController someController; > >>>>>>>>>>>> > >>>>>>>>>>>> @PostConstruct > >>>>>>>>>>>> private void construct() { > >>>>>>>>>>>> // log.fine(Thread.currentThread().getName()); > >>>>>>>>>>>> try { > >>>>>>>>>>>> > >> streamingConnection.subscribe("scheduler:notify", > >>>> new > >>>>>>>>>>>> MessageHandler() { > >>>>>>>>>>>> @Override > >>>>>>>>>>>> public void onMessage(Message m) { > >>>>>>>>>>>> try { > >>>>>>>>>>>> > >>>>>>>> log.fine(Thread.currentThread().getName()); > >>>>>>>>>>>> // this needs to spawn a new > thread > >>>>>>>> otherwise > >>>>>>>>>>>> injections are stale > >>>>>>>>>>>> Thread thread = new Thread(new > >>>>>>>> Runnable() { > >>>>>>>>>>>> public void run() { > >>>>>>>>>>>> log.fine(Thread.currentThread().getName()); > >>>>>>>>>>>> process(m.getData()); > >>>>>>>>>>>> } > >>>>>>>>>>>> }); > >>>>>>>>>>>> thread.start(); > >>>>>>>>>>>> while (thread.isAlive()) { > >>>>>>>>>>>> // wait > >>>>>>>>>>>> } > >>>>>>>>>>>> log.fine("Thread finished OK"); > >>>>>>>>>>>> m.ack(); > >>>>>>>>>>>> } catch (Exception e) { > >>>>>>>>>>>> > emailController.emailStackTrace(e); > >>>>>>>>>>>> } > >>>>>>>>>>>> } > >>>>>>>>>>>> }, new > >>>>>>>>>>>> > >>>>>>>>>>>> > >> > SubscriptionOptions.Builder().startWithLastReceived().manualAcks().ackWait(Duration.ofSeconds(60)) > >>>> .durableName("scheduler-service").build()); > >>>>>>>>>>>> } catch (IOException | InterruptedException | > >>>>>>>>>> TimeoutException e) > >>>>>>>>>>>> { > >>>>>>>>>>>> e.printStackTrace(); > >>>>>>>>>>>> } > >>>>>>>>>>>> } > >>>>>>>>>>>> > >>>>>>>>>>>> private void process(byte[] data) { > >>>>>>>>>>>> String raw = new String(data); > >>>>>>>>>>>> JsonReader jsonReader = Json.createReader(new > >>>>>>>>>> StringReader(raw)); > >>>>>>>>>>>> JsonObject jo = jsonReader.readObject(); > >>>>>>>>>>>> jsonReader.close(); > >>>>>>>>>>>> String type = utilityDao.readJsonString(jo, > >> "type"); > >>>>>>>>>>>> int id = utilityDao.readJsonInteger(jo, "id"); > >>>>>>>>>>>> if (type == null || id == 0) { > >>>>>>>>>>>> emailController.emailThrowable(new > Throwable(), > >>>> raw); > >>>>>>>>>>>> return; > >>>>>>>>>>>> } > >>>>>>>>>>>> log.info("Received a message: id: " + id + ", > >> type:" > >>>> + > >>>>>>>> type); > >>>>>>>>>>>> DefaultServerEndpointConfigurator dsec = new > >>>>>>>>>>>> DefaultServerEndpointConfigurator(); > >>>>>>>>>>>> SomeWebSocket nws = > >>>>>>>>>> dsec.getEndpointInstance(SomeWebSocket.class); > >>>>>>>>>>>> nws.broadcast(ja.toString()); > >>>>>>>>>>>> } > >>>>>>>>>>>> > >>>>>>>>>>>> } > >>>>>>>>>>>> > >>>>>>>>>>>> what is the best way to use an autocloseable? > >>>>>>>>>>>> > >>>>>>>>>>>> > >> > >