Romain has pointer me to this file which seems to do what I want but I can't make it work
https://github.com/apache/tomee/blob/861e65ba7fe11f28c52a020f26bfe368a294f665/container/openejb-core/src/test/java/org/apache/openejb/config/ConnectorProxyTest.java

I can't get any of my other tests running since I added the RA.  it just blows up :(

On 28/06/2021 23:32, Jonathan Gallimore wrote:
Do you have a test you can post? Off the top of my head, I'm not sure, but
I imagine we can figure something out.

P.S. thanks for the PR - I merged it in. Is that your first TomEE commit?
If so, congratulations :)

Jon

On Mon, 28 Jun 2021, 22:04 Matthew Broadhead,
<matthew.broadh...@nbmlaw.co.uk.invalid> wrote:

Hi Jon,

Have you got any tips for mocking the NATSConnectionFactory?  I am
trying to start doing some tests with SingleApplicationComposerRunner
and I get

org.apache.openejb.OpenEJBException: Can't find resource for class
tld.domain.controllers.message.SchedulerController#cf. (No provider
available for resource-ref 'null' of type
'org.apache.tomee.chatterbox.nats.api.NATSConnectionFactory' for
'NotificationTimer'.)
      at

org.apache.openejb.config.AutoConfig.processResourceRef(AutoConfig.java:1224)
      at org.apache.openejb.config.AutoConfig.deploy(AutoConfig.java:892)
      at org.apache.openejb.config.AutoConfig.deploy(AutoConfig.java:200)
      at

org.apache.openejb.config.ConfigurationFactory$Chain.deploy(ConfigurationFactory.java:420)
      at

org.apache.openejb.config.ConfigurationFactory.configureApplication(ConfigurationFactory.java:1033)
      at

org.apache.openejb.testing.ApplicationComposers.deployApp(ApplicationComposers.java:723)
      at

org.apache.openejb.testing.SingleApplicationComposerRunner$2.deployApp(SingleApplicationComposerRunner.java:148)
      at

org.apache.openejb.testing.ApplicationComposers.before(ApplicationComposers.java:386)
      at

org.apache.openejb.testing.SingleApplicationComposerRunner.start(SingleApplicationComposerRunner.java:169)
      at

org.apache.openejb.testing.SingleApplicationComposerRunner.access$100(SingleApplicationComposerRunner.java:43)
      at

org.apache.openejb.testing.SingleApplicationComposerRunner$1$1.evaluate(SingleApplicationComposerRunner.java:103)
      at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
      at

org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
      at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
      at

org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
      at

org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
      at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
      at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
      at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
      at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
      at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
      at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
      at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
      at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
      at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
      at

org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:43)
      at
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
      at
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
      at java.util.Iterator.forEachRemaining(Iterator.java:116)
      at

java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
      at
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
      at

java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
      at

java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
      at

java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
      at
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
      at
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
      at

org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:82)
      at

org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:73)
      at

org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
      at

org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
      at

org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
      at

org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
      at

org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
      at

org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
      at

org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:84)
      at

org.eclipse.jdt.internal.junit5.runner.JUnit5TestReference.run(JUnit5TestReference.java:98)
      at

org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:40)
      at

org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:541)
      at

org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:768)
      at

org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:464)
      at

org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:210)



On 23/06/2021 13:14, Jonathan Gallimore wrote:
Hi Matthew

Can you submit a PR against https://github.com/apache/tomee-chatterbox?

Looking into embedding the rar into the webapp, but as it stands right at
the moment, I'm either doing something wrong, or hitting an unusual bug
with the resource creation. I'm debugging through it.

Jon

On Fri, Jun 18, 2021 at 7:39 AM Matthew Broadhead
<matthew.broadh...@nbmlaw.co.uk.invalid> wrote:

I have pointed WST to the TomEE install and that works fine.  it might
be nice to embed the rar into the webapp.  does that work for production
as well?

where do i submit the PR?
https://github.com/jgallimore/tomee-chatterbox
or https://github.com/apache/tomee-chatterbox ?  it is by no means
complete but it allows me to set ackWait and durableName


On 17/06/2021 15:37, Jonathan Gallimore wrote:
On Thu, Jun 17, 2021 at 1:48 PM Matthew Broadhead
<matthew.broadh...@nbmlaw.co.uk.invalid> wrote:

i added ackWait and durableName as Strings in NATSActivationSpec so it
is working for me ok.

Very nice! Can you send a PR for that? I'll get it merged in. Nice work
:).
i have managed to deploy TomEE with docker fine.  But when deploying
with Eclipse WST I have to manually copy the rar into the apps folder
in
org.eclipse.wst.server.core.  Is there a way to automate the process
using the launch configuration?

It's been a while since I've used Eclipse, so the short answer is "I
don't
know". I tended to point WST to use a TomEE install, and to actually
take
control of the install (as opposed to using a folder
in org.eclipse.wst.server.core). That way you'd just set it up once.

There is one other alternative, which is to embed the .rar in the
webapp
itself. I'll hack up an example this evening for this adapter, but the
general gist of it is that you'd include the chatterbox-nats-api and
chatterbox-nats-impl jars in WEB-INF/lib (along with the dependencies),
and
configure the resource adapter in WEB-INF/resources.xml (which is
basically
a version of tomee.xml that is local to the application). Its
similar-ish
to packaging the rar in an ear, but keeps your deployable as a rar (and
should work with WST).

Jon



On 17/06/2021 11:52, Matthew Broadhead wrote:
maybe it is possible to put the Subscription option in the
NATSActivationSpec

On 17/06/2021 09:43, Matthew Broadhead wrote:
Hi Jon,

I was wondering how to set the SubscriptionOptions.  If you look at
the example below they are the third argument to the subscribe
function after the MessageHandler.  In my case I might want to have
control over the ackWait time and switch between manualAcks and
autoAcks.  The durable name is quite important too.
startWithLastReceived can be changed to getting all the messages or
some number of messages.  Sorry if I didn't mention these before

streamingConnection.subscribe("scheduler:notify", new
MessageHandler() {
       @Override
       public void onMessage(Message m) {
           ...
           m.ack();
       }
}, new

SubscriptionOptions.Builder().startWithLastReceived().manualAcks().ackWait(Duration.ofSeconds(60))
                       .durableName("scheduler-service").build());

On 16/06/2021 16:43, Jonathan Gallimore wrote:
Pushed. I've made client Id and cluster id configuration options
and
added
a README. If it working for you, I'd suggest we cut a 0.3 release
unless
there are any objections.

Jon

On Wed, Jun 16, 2021 at 12:16 PM Jonathan Gallimore <
jonathan.gallim...@gmail.com> wrote:

Sorry, tied up with a couple of things here. Literally just opened
the IDE
to finish this off. :)

Jon

On Wed, Jun 16, 2021 at 9:46 AM Matthew Broadhead
<matthew.broadh...@nbmlaw.co.uk.invalid> wrote:

hi Jon,

Is there anything else you need from my end?  It feels like it is
nearly
finished.  Could it pass the parameters in a similar way to how
it
is
done in the chatterbox-imap?  sending them in from the tomee.xml?


On 10/06/2021 19:07, Jonathan Gallimore wrote:
Thank you! That worked. I have pushed an update to my code, and
I've
been
able to send a message to NATS from a REST endpoint, and
receive a
message
from NATS via an MDB.

I still need to extract the cluster ID and client ID into
properties for
the resource adapter, and of course, try and write up how this
works.
Code
is here:
https://github.com/jgallimore/tomee-chatterbox/tree/nats, but
I'll
merge it in once I have done these couple of changes.

One other thing on my mind is that I'd like to try and find some
way to
make all this easier. If you've used JMS, you've used JCA,
possibly
without
realizing it, but the spec still feels very hard to get into - I
wonder
if
there is anything we can propose in that regard to try and make
creating
simple connectors a bit easier.

Jon

On Wed, Jun 9, 2021 at 4:31 PM Matthew Broadhead
<matthew.broadh...@nbmlaw.co.uk.invalid> wrote:

hi, it was my fault putting a confusing value in the docker
compose
file.  it should work like this

StreamingConnectionFactory cf = new
       StreamingConnectionFactory(new
Options.Builder().natsUrl("nats://localhost:4222")
.clusterId("yourclientid").clientId("anything").build());

but you could change the docker-compose.yml to have a -cid of
yourclusterid and then do this

StreamingConnectionFactory cf = new
       StreamingConnectionFactory(new
Options.Builder().natsUrl("nats://localhost:4222")
.clusterId("yourclusterid").clientId("yourclientid").build());

On 09/06/2021 17:15, Jonathan Gallimore wrote:
Thanks. If I can get that test going, I can probably get the
rest
working.
I suspect there are some other bugs in there.

Jon

On Wed, Jun 9, 2021 at 4:04 PM Matthew Broadhead
<matthew.broadh...@nbmlaw.co.uk.invalid> wrote:

Thanks I will have a look now

On 09/06/2021 15:19, Jonathan Gallimore wrote:
Ok, I'm stuck. If I boot up a NATS server with your
docker-compose.yml
file, and run the following test:

           @Test
           public void testShouldConnect() throws Exception {
               StreamingConnectionFactory cf = new
StreamingConnectionFactory(new
Options.Builder().natsUrl("nats://localhost:4222")

.clusterId("cluster-id").clientId("yourclientid").build());
               final StreamingConnection connection =
cf.createConnection();
Assert.assertNotNull(connection);

               connection.close();
           }

It fails with a timeout.

I monitored the connection with wireshark, and see the
following
< = from server to client
= from client to server
<INFO

<{"server_id":"NDMRYDSGUSH2QR6SZWMFB44ND5CODXGKNYTQ5IPLUGYUDBI6G54CIGF6","server_name":"NDMRYDSGUSH2QR6SZWMFB44ND5CODXGKNYTQ5IPLUGYUDBI6G54CIGF6","version":"2.1.4","proto":1,"git_commit":"fb009af","go":"go1.13.7","host":"0.0.0.0","port":4222,"max_payload":1048576,"client_id":10}
CONNECT
{"lang":"java","version":"2.6.5","name":"yourclientid","protocol":1,"verbose":false,"pedantic":false,"tls_required":false,"echo":true}
PING
<PONG
SUB _STAN.acks.LP4bdY88abuVJ19Qo5HVuk 1
SUB _INBOX.LP4bdY88abuVJ19Qo5HVn8 2
SUB _INBOX.LP4bdY88abuVJ19Qo5HVqw 3
SUB _INBOX.F0vVy1N0sQM3xseeEWMIAL.* 4
PUB _STAN.discover.cluster-id
_INBOX.F0vVy1N0sQM3xseeEWMIAL.F0vVy1N0sQM3xseeEWMISH 75
.yourclientid.._INBOX.LP4bdY88abuVJ19Qo5HVn8..".LP4bdY88abuVJ19Qo5HVjK(.0.
<PING
PONG
UNSUB 1
UNSUB 2
UNSUB 3
So there does appear to be some communication between my
test
and
the
NATS
server - I have no idea why it times out.

My code is here if you want to have a go:
https://github.com/jgallimore/tomee-chatterbox/tree/nats

Jon

On Wed, Jun 9, 2021 at 11:56 AM Jonathan Gallimore <
jonathan.gallim...@gmail.com> wrote:

Nervermind, I figured out my mistake. I'll post back when I
have
something
going.

Jon

On Wed, Jun 9, 2021 at 11:44 AM Jonathan Gallimore <
jonathan.gallim...@gmail.com> wrote:

I think I have something wired up, but when executing
this:
                   cf = new
StreamingConnectionFactory(new
Options.Builder().natsUrl(baseAddress)

.clusterId("cluster-id").clientId("client-id").build());

                   connection = cf.createConnection();


connection is null. Any pointers?

Jon

On Wed, Jun 9, 2021 at 8:16 AM Matthew Broadhead
<matthew.broadh...@nbmlaw.co.uk.invalid> wrote:

i have never used a JCA adapter before.  is it loaded in
using
the
tomee.xml as a Resource?  and then injected into a
singleton for
subscribing to messages?

On 08/06/2021 17:15, Jonathan Gallimore wrote:
Definitely sounds like a good case for a JCA adapter.
I'll take
a
quick
swing at hooking up an example for you.

Jon

On Tue, Jun 8, 2021 at 9:02 AM Matthew Broadhead
<matthew.broadh...@nbmlaw.co.uk.invalid> wrote:

Hi Jon,

NATS is basically a message queue, like ActiveMQ I
suppose.
I included the adapter into the project using maven
<dependency>
<groupId>io.nats</groupId>
<artifactId>java-nats-streaming</artifactId>
<version>2.2.3</version>
</dependency>

i started up a nats server using docker.  here is my
docker-compose.yml
version: '3.1'
services:
           nats-docker:
             image: nats-streaming:0.17.0
             restart: always
             command:
               - '-p'
               - '4222'
               - '-m'
               - '8222'
               - '-hbi'
               - '5s'
               - '-hbt'
               - '5s'
               - '-hbf'
               - '2'
               - '-SD'
               - '-cid'
               - 'yourclientid'
             environment:
               TZ: Europe/London
               LANG: en_GB.UTF-8
               LANGUAGE: en_GB:en
               LC_ALL: en_GB.UTF-8
             ports:
               - '4222:4222'
               - '8222:8222'
             expose:
               - 4222
               - 8222
             networks:
               - backend
networks:
           backend:
             driver: bridge

JCA sounds good if it solves the threading issue.  it
is
very
kind
of
you to offer to help write an adapter.  looking at the
code you
sent
it
looks complicated but i can have a stab at it if you
don't have
much
time
let me know if you need more info

Matthew

On 07/06/2021 17:48, Jonathan Gallimore wrote:
At the risk of sounding a bit ignorant... what is
NATS?
         From what I can tell, it sounds like you're
receiving a
stream
of
events
(over websocket) and want to do some processing in an
EJB or
CDI
bean for
each event. The connection to the NATS server isn't in
the
context
of a
HTTP (or any other type of) request, and just runs all
the
time
while the
server is running - does that sound about right?

Assuming that sounds right, it sounds a bit like the
Slack JCA
connector
I
wrote a while back:

https://github.com/apache/tomee-chatterbox/tree/master/chatterbox-slack
.
Essentially, the resource adapter connects to slack
and
runs
all
the
time.
Messages that come into the server from slack are
processed in
MDBs
that
implement the InboundListener interface.

JCA certainly feels complex, especially when compared
with
your
Singleton @Startup bean approach, but I usually find
that if I
try
and
work
with threads in EJBs, things usually go in the wrong
direction.
Conversely,
JCA even gives you a work manager to potentially
handle
that
stuff.
If you can give me some pointers to running a NATS
server,
I'd be
happy
to
help with a sample adapter and application.

Jon

On Mon, Jun 7, 2021 at 11:49 AM Matthew Broadhead
<matthew.broadh...@nbmlaw.co.uk.invalid> wrote:

I am trying to subscribe to a NATS streaming server
with
https://github.com/nats-io/stan.java which is
java.lang.Autocloseable.
At first it wasn't closing properly as seen in my
original
gist:
https://gist.github.com/chongma/2a3ab451f2aeabc98340a9b897394cfe
This was solved with this


https://stackoverflow.com/questions/39080296/hazelcast-threads-prevent-tomee-from-stopping
creating a default producer:
@ApplicationScoped
public class NatsConnectionProducer {

              @Resource(name = "baseAddressNats")
              private String baseAddressNats;

              @Produces
              @ApplicationScoped
              public StreamingConnection instance()
throws
IOException,
InterruptedException {
StreamingConnectionFactory cf = new
StreamingConnectionFactory(new
Options.Builder().natsUrl(baseAddressNats)
.clusterId("cluster-id").clientId("client-id").build());
                  return cf.createConnection();
              }

              public void destroy(@Disposes final
StreamingConnection
instance)
throws IOException, TimeoutException,
InterruptedException {
instance.close();
              }
}

But now i am creating a new thread because any
injections
with
JPA
had
cacheing issues and this seems to work but i am not
sure it
is
broadcasting to websockets correctly
@Singleton
@Lock(LockType.READ)
@Startup
public class SchedulerEvents {
              private static final Logger log =
Logger.getLogger(SchedulerEvents.class.getName());

              @Inject
              private StreamingConnection
streamingConnection;

              @Inject
              private SomeController someController;

              @PostConstruct
              private void construct() {
// log.fine(Thread.currentThread().getName());
                  try {

streamingConnection.subscribe("scheduler:notify",
new
MessageHandler() {
                          @Override
                          public void
onMessage(Message
m) {
                              try {

log.fine(Thread.currentThread().getName());
// this needs to spawn a new
thread
otherwise
injections are stale
Thread thread = new Thread(new
Runnable() {
public void run() {
log.fine(Thread.currentThread().getName());

process(m.getData());
}
                                  });
thread.start();
while (thread.isAlive()) {
// wait
                                  }
log.fine("Thread finished OK");
m.ack();
                              } catch (Exception e) {

     emailController.emailStackTrace(e);
}
                          }
                      }, new


SubscriptionOptions.Builder().startWithLastReceived().manualAcks().ackWait(Duration.ofSeconds(60))
.durableName("scheduler-service").build());
} catch (IOException | InterruptedException |
TimeoutException e)
{
e.printStackTrace();
                  }
              }

              private void process(byte[] data) {
                  String raw = new String(data);
                  JsonReader jsonReader =
Json.createReader(new
StringReader(raw));
JsonObject jo = jsonReader.readObject();
                  jsonReader.close();
                  String type =
utilityDao.readJsonString(jo,
"type");
int id = utilityDao.readJsonInteger(jo, "id");
                  if (type == null || id == 0) {
emailController.emailThrowable(new
Throwable(),
raw);
return;
                  }
                  log.info("Received a message: id:
" +
id + ",
type:"
+
type);
DefaultServerEndpointConfigurator dsec = new
DefaultServerEndpointConfigurator();
                 SomeWebSocket nws =
dsec.getEndpointInstance(SomeWebSocket.class);
nws.broadcast(ja.toString());
              }

}

what is the best way to use an autocloseable?




Reply via email to