JanYork opened a new issue, #836: URL: https://github.com/apache/rocketmq-clients/issues/836
### Before Creating the Bug Report - [X] I found a bug, not just asking a question, which should be created in [GitHub Discussions](https://github.com/apache/rocketmq-clients/discussions). - [X] I have searched the [GitHub Issues](https://github.com/apache/rocketmq-clients/issues) and [GitHub Discussions](https://github.com/apache/rocketmq-clients/discussions) of this repository and believe that this is not a duplicate. - [X] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ. ### Programming Language of the Client Node.js ### Runtime Platform Environment Macos Node.js 18.17.1 openjdk 17.0.11 2024-04-16 OpenJDK Runtime Environment Homebrew (build 17.0.11+0) OpenJDK 64-Bit Server VM Homebrew (build 17.0.11+0, mixed mode, sharing) ### RocketMQ Version of the Client/Server RocketMQ 5.3.0 ### Run or Compiler Version openjdk 17.0.11 2024-04-16 OpenJDK Runtime Environment Homebrew (build 17.0.11+0) OpenJDK 64-Bit Server VM Homebrew (build 17.0.11+0, mixed mode, sharing) ### Describe the Bug I got an inexplicable error. I just started and waited for messages, did nothing, and suddenly an error occurred. I caught it. I am not sure whether I need to ignore this error. It looks more like a bug or a problem that should not exist. It will interrupt my program. `receive message error InternalErrorException: [request-id=undefined, response-code=50001] Cannot invoke "apache.rocketmq.v2.Settings.getSubscription()" because "settings" is null. NullPointerException. org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.receiveMessage(ReceiveMessageActivity.java:63) at Function.check (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/[email protected]/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81:15) at SimpleConsumer.receiveMessage (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/[email protected]/node_modules/rocketmq-client-nodejs/src/consumer/Consumer.ts:78:19) at processTicksAndRejections (node:internal/process/task_queues:95:5) at SimpleConsumer.receive (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/[email protected]/node_modules/rocketmq-client-nodejs/src/consumer/SimpleConsumer.ts:122:12) at AppController.listenForMessages (/Users/muyouzhi/Code/demo/nest-demo/src/app.controller.ts:107:24) { code: 50001 } /Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/[email protected]/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81 throw new InternalErrorException(status.code, status.message, requestId); ^ InternalErrorException: [request-id=undefined, response-code=50001] Cannot invoke "apache.rocketmq.v2.Settings.getSubscription()" because "settings" is null. NullPointerException. org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.receiveMessage(ReceiveMessageActivity.java:63) at Function.check (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/[email protected]/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81:15) at SimpleConsumer.receiveMessage (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/[email protected]/node_modules/rocketmq-client-nodejs/src/consumer/Consumer.ts:78:19) at processTicksAndRejections (node:internal/process/task_queues:95:5) at SimpleConsumer.receive (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/[email protected]/node_modules/rocketmq-client-nodejs/src/consumer/SimpleConsumer.ts:122:12) at AppController.listenForMessages (/Users/muyouzhi/Code/demo/nest-demo/src/app.controller.ts:107:24) ` ### Steps to Reproduce Start and run the following code in the Nest program, and then try to send a few messages. It should be normal at this time. Then we wait patiently for a while. This error will not be triggered in a short time. It seems that there is a scheduled task that changes some status. In short, we have to wait for a long time. ```ts import { Body, Controller, OnModuleInit, Post } from "@nestjs/common"; import { MessageOptions, Producer, SimpleConsumer } from "rocketmq-client-nodejs"; interface IProducerQuery { topic: string; tag: string; keys?: string; body: string; group?: string; num?: number; } @Controller() export class AppController implements OnModuleInit { private producer: Producer; async onModuleInit() { await this.producerInit(); await this.consumerInit(); } async producerInit() { const producer = new Producer({ endpoints: "127.0.0.1:8081" }); this.producer = producer; await this.producer.startup() .catch((e) => { console.error("producer startup error", e); process.exit(1); }) .then(() => { console.log("producer startup success!"); }); } async consumerInit() { const task_to_pool_consumer = new SimpleConsumer({ consumerGroup: "task-to-pool", endpoints: "127.0.0.1:8081", subscriptions: new Map().set("task-processing", "task-to-pool") }); await task_to_pool_consumer.startup() .catch((e) => { console.error("consumer startup error", e); process.exit(1); }) .then(() => { console.log("task to pool consumer startup success!"); this.listenForMessages(task_to_pool_consumer, 5); }); const kitchens = ["kitchen1", "kitchen2", "kitchen3", "kitchen4", "kitchen5"]; const kitchen_consumers: SimpleConsumer[] = []; for (const kitchen of kitchens) { const task_assign_consumer = new SimpleConsumer({ consumerGroup: "task-assign", endpoints: "127.0.0.1:8081", subscriptions: new Map().set("task-processing", kitchen) }); kitchen_consumers.push(task_assign_consumer); } await Promise.all(kitchen_consumers.map((consumer: SimpleConsumer) => { consumer.startup().catch((e) => { console.error("consumer startup error", e); process.exit(1); }).then(() => { console.log("kitchen consumer startup success!"); this.listenForMessages(consumer, 5); }); })); const task_to_db_consumer = new SimpleConsumer({ consumerGroup: "task-to-db", endpoints: "127.0.0.1:8081", subscriptions: new Map().set("task-write", "task-to-db") }); await task_to_db_consumer.startup() .catch((e) => { console.error("consumer startup error", e); process.exit(1); }) .then(() => { console.log("task to db consumer startup success!"); this.listenForMessages(task_to_db_consumer, 1); }); const task_to_cache_consumer = new SimpleConsumer({ consumerGroup: "task-to-cache", endpoints: "127.0.0.1:8081", subscriptions: new Map().set("task-write", "task-to-cache") }); await task_to_cache_consumer.startup() .catch((e) => { console.error("consumer startup error", e); process.exit(1); }) .then(() => { console.log("task to cache consumer startup success!"); this.listenForMessages(task_to_cache_consumer, 1); }); } async listenForMessages(consumer: SimpleConsumer, number: number) { while (true) { const messages = await consumer.receive(number).catch((e) => { console.error("receive message error", e); throw e; }); if (!messages || messages.length === 0) { continue; } for (const message of messages) { console.log(`Received message: ${message.body.toString()}`); await consumer.ack(message).catch((e) => { console.error("ack message error", e); throw e; }).then(() => { console.log("ack message success!"); }); } } } @Post("production") async production(@Body() query: IProducerQuery) { const options: MessageOptions = { topic: query.topic, tag: query.tag, keys: [query.keys as string] || [Date.now().toString()], body: Buffer.from(query.body) }; query.group ? options.messageGroup = query.group : null; if (query.num && query.num > 1) { for (let i = 0; i < query.num; i++) { options.keys = [Date.now().toString()]; options.body = Buffer.from(query.body + i); await this.producer .send(options) .catch((e) => { console.error("producer send message error", e); }) .then((r) => { console.log(`producer send message success! receipt -> ${r ? JSON.stringify(r) : "null"}`); }); } return; } await this.producer .send(options) .catch((e) => { console.error("producer send message error", e); }) .then((r) => { console.log(`producer send message success! receipt -> ${r ? JSON.stringify(r) : "null"}`); }); } } ``` If nothing goes wrong, you should wait until the program has an error: ``` receive message error InternalErrorException: [request-id=undefined, response-code=50001] Cannot invoke "apache.rocketmq.v2.Settings.getSubscription()" because "settings" is null. NullPointerException. org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.receiveMessage(ReceiveMessageActivity.java:63) at Function.check (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/[email protected]/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81:15) at SimpleConsumer.receiveMessage (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/[email protected]/node_modules/rocketmq-client-nodejs/src/consumer/Consumer.ts:78:19) at processTicksAndRejections (node:internal/process/task_queues:95:5) at SimpleConsumer.receive (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/[email protected]/node_modules/rocketmq-client-nodejs/src/consumer/SimpleConsumer.ts:122:12) at AppController.listenForMessages (/Users/muyouzhi/Code/demo/nest-demo/src/app.controller.ts:107:24) { code: 50001 } /Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/[email protected]/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81 throw new InternalErrorException(status.code, status.message, requestId); ^ InternalErrorException: [request-id=undefined, response-code=50001] Cannot invoke "apache.rocketmq.v2.Settings.getSubscription()" because "settings" is null. NullPointerException. org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.receiveMessage(ReceiveMessageActivity.java:63) at Function.check (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/[email protected]/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81:15) at SimpleConsumer.receiveMessage (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/[email protected]/node_modules/rocketmq-client-nodejs/src/consumer/Consumer.ts:78:19) at processTicksAndRejections (node:internal/process/task_queues:95:5) at SimpleConsumer.receive (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/[email protected]/node_modules/rocketmq-client-nodejs/src/consumer/SimpleConsumer.ts:122:12) at AppController.listenForMessages (/Users/muyouzhi/Code/demo/nest-demo/src/app.controller.ts:107:24) ``` This error comes from StatusChecker.check(status) in the receiveMessage method. This error indicates that when calling RocketMQ to receive messages, Settings.getSubscription() returned null, causing a NullPointerException. Why does the Subscription data in Settings suddenly disappear? Why does the NullPointerException exist? ### What Did You Expect to See? Continuous, continuous and normal operation. ### What Did You See Instead? ``` receive message error InternalErrorException: [request-id=undefined, response-code=50001] Cannot invoke "apache.rocketmq.v2.Settings.getSubscription()" because "settings" is null. NullPointerException. org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.receiveMessage(ReceiveMessageActivity.java:63) at Function.check (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/[email protected]/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81:15) at SimpleConsumer.receiveMessage (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/[email protected]/node_modules/rocketmq-client-nodejs/src/consumer/Consumer.ts:78:19) at processTicksAndRejections (node:internal/process/task_queues:95:5) at SimpleConsumer.receive (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/[email protected]/node_modules/rocketmq-client-nodejs/src/consumer/SimpleConsumer.ts:122:12) at AppController.listenForMessages (/Users/muyouzhi/Code/demo/nest-demo/src/app.controller.ts:107:24) { code: 50001 } /Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/[email protected]/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81 throw new InternalErrorException(status.code, status.message, requestId); ^ InternalErrorException: [request-id=undefined, response-code=50001] Cannot invoke "apache.rocketmq.v2.Settings.getSubscription()" because "settings" is null. NullPointerException. org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.receiveMessage(ReceiveMessageActivity.java:63) at Function.check (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/[email protected]/node_modules/rocketmq-client-nodejs/src/exception/StatusChecker.ts:81:15) at SimpleConsumer.receiveMessage (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/[email protected]/node_modules/rocketmq-client-nodejs/src/consumer/Consumer.ts:78:19) at processTicksAndRejections (node:internal/process/task_queues:95:5) at SimpleConsumer.receive (/Users/muyouzhi/Code/demo/nest-demo/node_modules/.pnpm/[email protected]/node_modules/rocketmq-client-nodejs/src/consumer/SimpleConsumer.ts:122:12) at AppController.listenForMessages (/Users/muyouzhi/Code/demo/nest-demo/src/app.controller.ts:107:24) ``` ### Additional Context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
