shenxinquan opened a new issue #407: TransactionMQProducer 调用shutdown() 方法bug URL: https://github.com/apache/rocketmq/issues/407 代码部分片段。 ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); producer.setExecutorService(executorService); 描述:当producer 设置自定义线程池的时候 producer 调用shutdown 方法的时候会报如下错误 Exception in thread "main" java.lang.NullPointerException at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.destroyTransactionEnv(DefaultMQProducerImpl.java:135) at org.apache.rocketmq.client.producer.TransactionMQProducer.shutdown(TransactionMQProducer.java:50) at com.mmall.concurrency.rocketmq.TransactionProducer.main(TransactionProducer.java:89) 主要是 DefaultMQProducerImpl 类 方法 public void destroyTransactionEnv() { this.checkExecutor.shutdown(); this.checkRequestQueue.clear(); } 里面的this.checkRequestQueue.clear(); 报错 分析主要原因是 DefaultMQProducerImpl 里面的initTransactionEnv()方法 当有ExecutorService线程池的时候 this.checkRequestQueue 没同意初始化导致的。 代码片段如下 public void initTransactionEnv() { TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer; if (producer.getExecutorService() != null) { this.checkExecutor = producer.getExecutorService(); } else { this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(2000); this.checkExecutor = new ThreadPoolExecutor( 1, 1, 1000 * 60, TimeUnit.MILLISECONDS, this.checkRequestQueue); } }
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
