coryvirok opened a new issue #133:
URL: https://github.com/apache/pulsar-client-node/issues/133


   First off, thanks for Pulsar and all of these client libs! I'm excited to 
vet Pulsar for my company and see if it solves some of Kafka's shortcomings. 
   
   
   I have a gRPC service that implements a server-streaming RPC. This RPC reads 
from a Pulsar topic and streams the data from the topic to the gRPC client. 
   
   This all works well until I try and gracefully shut down the gRPC server. 
I'm finding that the server hangs due to the Pulsar Consumer's synchronous 
`receive()` method. The only way I can get it to work is to use 
`consumer.receive(timeout)` and check the exception message. Which seems fairly 
non-idiomatic for Node.js and makes it difficult to handle complex, nested, 
shutdown logic in a real application.
   
   Here's a simplified example:
   
   ```js
   import Pulsar from "pulsar-client";
   
   const pulsarClient = new Pulsar.Client({
     serviceUrl: "pulsar://127.0.0.1:6650",
   });
   
   async function main() {
     const consumer = await pulsarClient.subscribe({
       topic: `test`,
       subscription: "default",
     });
   
     while (true) {
       try {
         // Since receive() is blocking, use this version in order to give up 
control and allow
         // the while loop to check if it should finish.
         await consumer.receive(1000);  
         console.log("received message");
       } catch (e) {
         // We are forced to use timeout exceptions and `e.message` matching to 
know when we should
         // retry the call to `receive(timeout)`
         if (e.message === "Failed to received message TimeOut") {
           console.log("receive() timed out, retrying");
           continue;
         } else if (e.message === "Failed to received message AlreadyClosed") {
           // We are forced to use another exception and `e.message` matching 
to know when the 
           // consumer was closed by the SIGINT handler
           console.log("consumer closed, breaking loop");
           break;
         }
         // Some other, legit exception that we should handle somewhere but has 
nothing to do with
         // control flow
         throw e;
       }
     }
   
     // Do whatever cleanup needs to happen
     console.log("CLEANING UP....");
   
     console.log("returning from main");
   }
   
   // Trap Ctrl-C from the keyboard
   // This could also be `process.on("beforeExit", () => {})` or any other 
signal handler
   //
   // Try commenting this out and seeing if "CLEANING UP..." is logged, 
(spoiler: it isn't)
   process.on("SIGINT", async () => {
     console.log("shutting down");
     await pulsarClient.close();
   });
   
   await main();
   ```
   
   I'm not a Node.js expert, but from my experience streams are usually managed 
via events. This pattern allows the developer to write event-driven code 
instead of imperative loops like the one above. This is helpful because it 
doesn't require exception handling and message matching to determine control 
flow. And it lets the developer call `consumer.close()` without worrying if the 
`consumer.receive()` sync method is blocking the event loop. 
   
   I understand the desire to use async/await here and I'm a HUGE fan of the 
pattern. But I think it's important for `consumer.receive()` to be async 
instead of sync. Otherwise, we're left having to write these loops using the 
timeout version of `receive(timeout)`.
   
   An example of how I'd expect to be able to use the Pulsar Consumer:
   
   ```js
   const consumer = await pulsarClient.subscribe({
     topic: `test`,
     subscription: "default",
   });
   
   consumer.on('data', (msg) => {
     // handle message from consumer
   });
   
   consumer.on('end', () => {
     // clean up any resources that should be cleaned up after a consumer is 
closed,
     // for various reasons, e.g. the Pulsar server went down, or the SIGINT 
handler called
     // consumer.close() or pulsarClient.close()
   });
   
   consumer.on('error', (err) => {
     // Consumer is no longer usable, but it received an error... do cleanup 
and report
     // the issue
   });
   ```
   
   My question is, is this the right way to use this library? Am I missing 
something or do we need to use the sync methods `consumer.receive()` and 
`consumer.receive(timeout)` in order to release control to the main loop in 
order to do things like shutdown the process?
   
   Thanks in advance!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to