codelipenghui commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r787417655
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -760,75 +785,86 @@ public void connectionOpened(final ClientCnx cnx) {
}
// startMessageRollbackDurationInSec should be consider only once when
consumer connects to first time
long startMessageRollbackDuration = (startMessageRollbackDurationInSec
> 0
- && startMessageId != null &&
startMessageId.equals(initialStartMessageId)) ?
startMessageRollbackDurationInSec : 0;
- ByteBuf request = Commands.newSubscribe(topic, subscription,
consumerId, requestId, getSubType(), priorityLevel,
- consumerName, isDurable, startMessageIdData, metadata,
readCompacted,
- conf.isReplicateSubscriptionState(),
InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
- startMessageRollbackDuration, si, createTopicIfDoesNotExist,
conf.getKeySharedPolicy(),
- conf.getSubscriptionProperties());
-
- cnx.sendRequestWithId(request, requestId).thenRun(() -> {
- synchronized (ConsumerImpl.this) {
- if (changeToReadyState()) {
- consumerIsReconnectedToBroker(cnx, currentSize);
- } else {
+ && startMessageId != null
+ && startMessageId.equals(initialStartMessageId)) ?
startMessageRollbackDurationInSec : 0;
+
+ // synchronized this, because redeliverUnAckMessage eliminate the
epoch inconsistency between them
+ synchronized (this) {
+ setClientCnx(cnx);
+ ByteBuf request = Commands.newSubscribe(topic, subscription,
consumerId, requestId, getSubType(),
+ priorityLevel, consumerName, isDurable,
startMessageIdData, metadata, readCompacted,
+ conf.isReplicateSubscriptionState(),
+
InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
+ startMessageRollbackDuration, si,
createTopicIfDoesNotExist, conf.getKeySharedPolicy(),
+ // this.consumerEpoch will increase
Review comment:
Use the current epoch to subscribe.
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1705,29 +1762,52 @@ public int numMessagesInQueue() {
@Override
public void redeliverUnacknowledgedMessages() {
- ClientCnx cnx = cnx();
- if (isConnected() && cnx.getRemoteEndpointProtocolVersion() >=
ProtocolVersion.v2.getValue()) {
- int currentSize = 0;
- synchronized (this) {
- currentSize = incomingMessages.size();
- clearIncomingMessages();
- unAckedMessageTracker.clear();
- }
-
cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(consumerId),
cnx.ctx().voidPromise());
- if (currentSize > 0) {
- increaseAvailablePermits(cnx, currentSize);
+ // First : synchronized in order to handle consumer reconnect produce
race condition, when broker receive
+ // redeliverUnacknowledgedMessages and consumer have not be created and
+ // then receive reconnect epoch change the broker is smaller than the
client epoch, this will cause client epoch
+ // smaller than broker epoch forever. client will not receive message
anymore.
+ // Second : we should synchronized `ClientCnx cnx = cnx()` to
+ // prevent use old cnx to send redeliverUnacknowledgedMessages to a
old broker
+ synchronized (ConsumerImpl.this) {
+ ClientCnx cnx = cnx();
+ // V1 don't support redeliverUnacknowledgedMessages
+ if (cnx != null && cnx.getRemoteEndpointProtocolVersion() <
ProtocolVersion.v2.getValue()) {
+ if ((getState() == State.Connecting)) {
+ log.warn("[{}] Client Connection needs to be established "
+
+ "for redelivery of unacknowledged messages", this);
+ } else {
+ log.warn("[{}] Reconnecting the client to redeliver the
messages.", this);
+ cnx.ctx().close();
+ }
+
+ return;
}
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] [{}] Redeliver unacked messages and send
{} permits", subscription, topic,
- consumerName, currentSize);
+
+ // clear local message
+ int currentSize = 0;
+ currentSize = incomingMessages.size();
+ clearIncomingMessages();
+ unAckedMessageTracker.clear();
+
+ // is channel is connected, we should send redeliver command to
broker
+ if (isConnected(cnx) && cnx != null) {
Review comment:
```suggestion
if (cnx != null && isConnected(cnx)) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]