congbobo184 commented on code in PR #15729:
URL: https://github.com/apache/pulsar/pull/15729#discussion_r887634742
##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java:
##########
@@ -414,13 +414,6 @@ void reconsumeLater(Message<?> message,
* @param messageId
* The {@code MessageId} to be cumulatively acknowledged
* @param txn {@link Transaction} the transaction to cumulative ack
- * @throws PulsarClientException.AlreadyClosedException
- * if the consumer was already closed
- * @throws
org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
- * if the ack with messageId is less than the messageId in
pending ack state or ack with transaction is
- * different from the transaction in pending ack.
- * @throws
org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
- * broker don't support transaction
Review Comment:
don't delete this
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -578,11 +579,32 @@ protected CompletableFuture<Void> doAcknowledge(MessageId
messageId, AckType ack
}
@Override
- protected CompletableFuture<Void> doAcknowledge(List<MessageId>
messageIdList,
- AckType ackType,
- Map<String, Long>
properties,
- TransactionImpl txn) {
- return
this.acknowledgmentsGroupingTracker.addListAcknowledgment(messageIdList,
ackType, properties);
+ protected CompletableFuture<Void> doAcknowledge(List<MessageId>
messageIdList, AckType ackType,
+ Map<String, Long>
properties, TransactionImpl txn) {
+
+ for (MessageId messageId : messageIdList) {
+ checkArgument(messageId instanceof MessageIdImpl);
+ }
+ if (getState() != State.Ready && getState() != State.Connecting) {
+ stats.incrementNumAcksFailed();
+ PulsarClientException exception = new
PulsarClientException("Consumer not ready. State: " + getState());
+ if (AckType.Individual.equals(ackType)) {
+ onAcknowledge(messageIdList, exception);
+ } else if (AckType.Cumulative.equals(ackType)) {
+ onAcknowledgeCumulative(messageIdList, exception);
+ }
+ return FutureUtil.failedFuture(exception);
+ }
+ if (txn != null) {
+ List<CompletableFuture<Void>> completableFutures = new
LinkedList<>();
+ for (MessageId messageId : messageIdList) {
+
completableFutures.add(doTransactionAcknowledgeForResponse(messageId, ackType,
null,
Review Comment:
the proto support send messgeIdList, so you should rewrite a command is
better.
https://github.com/apache/pulsar/blob/9b01d1c472aa5cde5205ad86fa3b8f998da95f35/pulsar-common/src/main/proto/PulsarApi.proto#L553
##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java:
##########
@@ -469,13 +462,6 @@ CompletableFuture<Void>
acknowledgeCumulativeAsync(MessageId messageId,
*
* @param messageId {@link MessageId} to be individual acknowledged
* @param txn {@link Transaction} the transaction to cumulative ack
- * @throws PulsarClientException.AlreadyClosedException
- * if the consumer was already closed
- * @throws
org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
- * if the ack with messageId has been acked by another
transaction
- * @throws
org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
- * broker don't support transaction
- * don't find batch size in consumer pending ack
Review Comment:
if you want to delete this, please add the sync method in the consumer API
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java:
##########
@@ -606,12 +625,21 @@ protected CompletableFuture<Void>
doAcknowledgeWithTxn(List<MessageId> messageId
Map<String, Long>
properties,
TransactionImpl
txn) {
CompletableFuture<Void> ackFuture;
- if (txn != null) {
+ if (txn != null && this instanceof ConsumerImpl) {
+
+ // it is okay that we register acked topic after sending the
acknowledgements. because
+ // the transactional ack will not be visiable for consumers until
the transaction is
+ // committed
+ if (ackType == AckType.Cumulative) {
+ txn.registerCumulativeAckConsumer((ConsumerImpl<?>) this);
+ }
+
ackFuture = txn.registerAckedTopic(getTopic(), subscription)
.thenCompose(ignored -> doAcknowledge(messageIdList,
ackType, properties, txn));
Review Comment:
we don't need regist ack in this code
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]