hepyu opened a new issue #12800: URL: https://github.com/apache/pulsar/issues/12800
疑似pulsar-broker的bug,但我无法确认。 目录: (1).不用英文的原因 (2).pulsar部署版本&细节&架构 (3).问题&现象与使用陈述&脱敏代码 1.问题 2.现象与使用陈述&脱敏代码 (1).不用英文的原因 我英文不好,虽然看懂没有问题,但是如果用英文陈述整个问题&现象的话,无法保证陈述的准确行,故使用中文。 (2).pulsar部署版本&细节&架构 pulsar版本是2.8.0,部署在openjdk11上,具体版本号是:11.0.12。 在aws海外部署,使用机型是c5a.2xlarge(8c16g),一共是3台,每台部署一个broker、bookie、zk。启动命令的参数没有修改都是默认值。 部署详情与细节: pulsar-7:aws上部署生产级别的5节点pulsar集群 https://mp.weixin.qq.com/s/YwCr-l2WcM4fJVg7NIx_HA (3).问题&现象与使用陈述&脱敏代码 1.问题 40个分区的topic消息严重不均衡下个别partition无法被consumer消费。最近一次是有两个分区各自堆积到30万左右(backlog值)。 2.现象与使用陈述&脱敏代码 producer使用的是批量发送(平均8条左右是一批,最多40条是一批),并且是异步发送,使用key-sharding的方式发送到这个topic的不同分区,脱敏代码: ``` package test; import com.google.common.collect.Lists; import org.redisson.api.RTopic; import org.redisson.api.RedissonClient; import org.redisson.client.codec.StringCodec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import org.apache.pulsar.client.api.Producer; @Service public class PushService implements IPushService { private final static Logger logger = LoggerFactory.getLogger(PushService.class); @Resource private IMqBackoffService mqBackoffService; @Resource(name = "XXXStatusProducer") private Producer<byte[]> YYYStatusUUUProducer; @Override public void pushZZZPPPRRRStatus(Long userId, List<YYYStatusResp> RRRList) { String content = GsonUtil.beanToJsonString(RRRList); try { //使用的是异步,且批量的发送方式,按照key做sharding发送到topic不同的partition. YYYStatusUUUProducer.newMessage().key(String.valueOf(userId)) .value(content.getBytes(StandardCharsets.UTF_8)).sendAsync().exceptionally((e -> { logger.error("send sync ZZZ PPP RRR status error,content:{}", content, e); // 如果发送失败,会将异步发送失败的消息存到aws的aurora数据库,使用数据库的本地事务(shardingjdbc4.1.1),事务使用的注解方式. mqBackoffService.saveYYYStatus(RRRList); return null; })); } catch (Exception e) { logger.error("send ZZZ PPP RRR status error,content:{}", content, e); mqBackoffService.saveYYYStatus(RRRList); } } } ``` consumer使用的是key-sharding方式消费,脱敏代码: ``` package test; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONUtil; import lombok.SneakyThrows; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.util.List; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageListener; @Component public class XXXResultMessageListener implements MessageListener<byte[]> { private static final Logger log = LoggerFactory.getLogger(XXXResultMessageListener.class); private final IoooService oooService; public XXXResultMessageListener(IoooService oooService) { this.oooService = oooService; } @SneakyThrows @LogTid @Override public void received(Consumer<byte[]> consumer, Message<byte[]> msg) { String body = new String(msg.getValue()); try { if (StrUtil.isBlank(body)) { log.warn("YYY result received msg value is null messageId:{}", msg.getMessageId()); consumer.acknowledge(msg.getMessageId()); return; } log.info("YYY result,topicName:{},message:{}", msg.getTopicName(), body); List<TTTUUUStatusResp> TTTUUUStatusRespList = JSONUtil.toList(body, TTTUUUStatusResp.class); if (CollUtil.isNotEmpty(TTTUUUStatusRespList)) { TTTUUUStatusResp TTTUUUStatusResp = TTTUUUStatusRespList.get(0); if (UserTypeEnum.PPP.getCode().equals(TTTUUUStatusResp.getUserType())) { // 只是设置标志,是否打印日志. ThreadFilter.setPrintFlag(false); } // 同步处理,等处理完成后再获取消息,涉及到的aurora数据库的本地事务,使用的是shardingjdbc4.1.1,事务都是用的注解方式. oooService.vvvBatchReceived(TTTUUUStatusRespList); } consumer.acknowledge(msg.getMessageId()); } catch (Exception e) { if (e instanceof BaseRuntimeException) { // BaseRuntimeException是业务异常,业务规定不用处理,直接返回ack. consumer.acknowledge(msg.getMessageId()); } else { log.error("YYY result Failed to process message error,data is {},e:{}", body, e.getStackTrace(), e); // 如果业务处理失败,告知pulsar重新消费. consumer.negativeAcknowledge(msg); } throw e; } finally { ThreadFilter.removePrintFlag(); } } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
