Copilot commented on code in PR #25120:
URL: https://github.com/apache/pulsar/pull/25120#discussion_r2657434332
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java:
##########
@@ -80,15 +88,41 @@ public Map<Consumer, List<EntryAndMetadata>> assign(final
List<EntryAndMetadata>
if (metadata == null || !metadata.hasUuid() ||
!metadata.hasChunkId() || !metadata.hasNumChunksFromMsg()) {
consumerToEntries.computeIfAbsent(consumer, __ -> new
ArrayList<>()).add(entryAndMetadata);
+ availablePermits--;
} else {
- final Consumer consumerForUuid = getConsumerForUuid(metadata,
consumer, availablePermits);
+ final String uuid = metadata.getUuid();
+ Consumer consumerForUuid = uuidToConsumer.get(uuid);
if (consumerForUuid == null) {
- unassignedMessageProcessor.accept(entryAndMetadata);
- continue;
+ if (metadata.getChunkId() != 0) {
+ if (subscription != null) {
+ log.warn("[{}][{}] Skip the message because of it
not the first chunk."
+ + " Position: {}, UUID: {},
ChunkId: {}, NumChunksFromMsg: {}",
+ subscription.getTopicName(),
subscription.getName(), entryAndMetadata.getPosition(),
+ metadata.getUuid(), metadata.getChunkId(),
metadata.getNumChunksFromMsg());
+ // Directly ack the message
+ if (!(subscription instanceof
PulsarCompactorSubscription)) {
Review Comment:
The error message uses double concatenation with a plus sign outside the
format arguments, which is unconventional and harder to read. The message
should be formatted as a single string template or the concatenation should be
inside the curly braces if necessary for readability.
```suggestion
log.warn("[{}][{}] Skip the message because of
it not the first chunk. Position: {}, UUID: {}, ChunkId: {}, NumChunksFromMsg:
{}",
subscription.getTopicName(),
subscription.getName(), entryAndMetadata.getPosition(),
metadata.getUuid(),
metadata.getChunkId(), metadata.getNumChunksFromMsg());
// Directly ack the message
if (!(subscription instanceof
PulsarCompactorSubscription)) {
if (!(subscription instanceof
PulsarCompactorSubscription)) {
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java:
##########
@@ -80,15 +88,41 @@ public Map<Consumer, List<EntryAndMetadata>> assign(final
List<EntryAndMetadata>
if (metadata == null || !metadata.hasUuid() ||
!metadata.hasChunkId() || !metadata.hasNumChunksFromMsg()) {
consumerToEntries.computeIfAbsent(consumer, __ -> new
ArrayList<>()).add(entryAndMetadata);
+ availablePermits--;
} else {
- final Consumer consumerForUuid = getConsumerForUuid(metadata,
consumer, availablePermits);
+ final String uuid = metadata.getUuid();
+ Consumer consumerForUuid = uuidToConsumer.get(uuid);
if (consumerForUuid == null) {
- unassignedMessageProcessor.accept(entryAndMetadata);
- continue;
+ if (metadata.getChunkId() != 0) {
+ if (subscription != null) {
+ log.warn("[{}][{}] Skip the message because of it
not the first chunk."
+ + " Position: {}, UUID: {},
ChunkId: {}, NumChunksFromMsg: {}",
+ subscription.getTopicName(),
subscription.getName(), entryAndMetadata.getPosition(),
+ metadata.getUuid(), metadata.getChunkId(),
metadata.getNumChunksFromMsg());
+ // Directly ack the message
+ if (!(subscription instanceof
PulsarCompactorSubscription)) {
+
subscription.acknowledgeMessage(Collections.singletonList(
+ entryAndMetadata.getPosition()),
AckType.Individual, Collections.emptyMap());
+ entryAndMetadata.release();
+ }
+ }
Review Comment:
When subscription is null or the subscription is a
PulsarCompactorSubscription, non-first chunks without a cached consumer are not
acknowledged or released (the if block at lines 97-108 doesn't execute
acknowledgment). However, the code still proceeds to cache the UUID mapping
(line 111) and add the message to consumerToEntries (line 120). This means
subsequent chunks with the same UUID will be delivered to this consumer, even
though chunk0 was never received. This partially defeats the purpose of the
fix, as the subscription will still receive incomplete chunked messages in
these cases.
```suggestion
entryAndMetadata.release();
} else {
// For compactor subscriptions, just release
the entry without ack
entryAndMetadata.release();
}
} else {
// No subscription available, just release the
entry
entryAndMetadata.release();
}
// Do not cache UUID mapping or deliver this
non-first chunk
continue;
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java:
##########
@@ -58,7 +58,7 @@ public void prepareData() {
roundRobinConsumerSelector.clear();
entryAndMetadataList.clear();
replayQueue.clear();
- assignor = new SharedConsumerAssignor(roundRobinConsumerSelector,
replayQueue::add);
+ assignor = new SharedConsumerAssignor(roundRobinConsumerSelector,
replayQueue::add, null);
Review Comment:
The test passes null for the subscription parameter, which means the new
logic for handling missing chunk0 (lines 97-108 in SharedConsumerAssignor.java)
is not covered by existing tests. Consider adding test cases that:
1. Verify non-first chunks are properly acknowledged when chunk0 is missing
2. Test the behavior when subscription is a PulsarCompactorSubscription
3. Ensure that acknowledged chunks don't get dispatched to consumers
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java:
##########
@@ -80,15 +88,41 @@ public Map<Consumer, List<EntryAndMetadata>> assign(final
List<EntryAndMetadata>
if (metadata == null || !metadata.hasUuid() ||
!metadata.hasChunkId() || !metadata.hasNumChunksFromMsg()) {
consumerToEntries.computeIfAbsent(consumer, __ -> new
ArrayList<>()).add(entryAndMetadata);
+ availablePermits--;
} else {
- final Consumer consumerForUuid = getConsumerForUuid(metadata,
consumer, availablePermits);
+ final String uuid = metadata.getUuid();
+ Consumer consumerForUuid = uuidToConsumer.get(uuid);
if (consumerForUuid == null) {
- unassignedMessageProcessor.accept(entryAndMetadata);
- continue;
+ if (metadata.getChunkId() != 0) {
+ if (subscription != null) {
+ log.warn("[{}][{}] Skip the message because of it
not the first chunk."
+ + " Position: {}, UUID: {},
ChunkId: {}, NumChunksFromMsg: {}",
+ subscription.getTopicName(),
subscription.getName(), entryAndMetadata.getPosition(),
+ metadata.getUuid(), metadata.getChunkId(),
metadata.getNumChunksFromMsg());
+ // Directly ack the message
+ if (!(subscription instanceof
PulsarCompactorSubscription)) {
+
subscription.acknowledgeMessage(Collections.singletonList(
+ entryAndMetadata.getPosition()),
AckType.Individual, Collections.emptyMap());
+ entryAndMetadata.release();
+ }
+ }
+ }
+ consumerForUuid = consumer;
+ uuidToConsumer.put(uuid, consumerForUuid);
+ }
+
+ final int permits =
consumerToPermits.computeIfAbsent(consumerForUuid,
Consumer::getAvailablePermits);
+ if (metadata.getChunkId() == metadata.getNumChunksFromMsg() -
1) {
+ // The last chunk is received, we should remove the uuid
Review Comment:
The comment states "we should remove the uuid" but should clarify "we should
remove the uuid from the cache" for better clarity.
```suggestion
// The last chunk is received, we should remove the uuid
from the cache
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java:
##########
@@ -20,18 +20,24 @@
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.persistent.PulsarCompactorSubscription;
+import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
/**
* The assigner to assign entries to the proper {@link Consumer} in the shared
subscription.
*/
+
Review Comment:
There is an unnecessary blank line between the Javadoc comment and the class
annotations. The annotations should immediately follow the Javadoc without a
blank line in between.
```suggestion
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java:
##########
@@ -80,15 +88,41 @@ public Map<Consumer, List<EntryAndMetadata>> assign(final
List<EntryAndMetadata>
if (metadata == null || !metadata.hasUuid() ||
!metadata.hasChunkId() || !metadata.hasNumChunksFromMsg()) {
consumerToEntries.computeIfAbsent(consumer, __ -> new
ArrayList<>()).add(entryAndMetadata);
+ availablePermits--;
} else {
- final Consumer consumerForUuid = getConsumerForUuid(metadata,
consumer, availablePermits);
+ final String uuid = metadata.getUuid();
+ Consumer consumerForUuid = uuidToConsumer.get(uuid);
if (consumerForUuid == null) {
- unassignedMessageProcessor.accept(entryAndMetadata);
- continue;
+ if (metadata.getChunkId() != 0) {
+ if (subscription != null) {
+ log.warn("[{}][{}] Skip the message because of it
not the first chunk."
Review Comment:
The error message contains a grammatical error. "because of it not the first
chunk" should be "because it is not the first chunk".
```suggestion
log.warn("[{}][{}] Skip the message because it
is not the first chunk."
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java:
##########
@@ -80,15 +88,41 @@ public Map<Consumer, List<EntryAndMetadata>> assign(final
List<EntryAndMetadata>
if (metadata == null || !metadata.hasUuid() ||
!metadata.hasChunkId() || !metadata.hasNumChunksFromMsg()) {
consumerToEntries.computeIfAbsent(consumer, __ -> new
ArrayList<>()).add(entryAndMetadata);
+ availablePermits--;
} else {
- final Consumer consumerForUuid = getConsumerForUuid(metadata,
consumer, availablePermits);
+ final String uuid = metadata.getUuid();
+ Consumer consumerForUuid = uuidToConsumer.get(uuid);
if (consumerForUuid == null) {
- unassignedMessageProcessor.accept(entryAndMetadata);
- continue;
+ if (metadata.getChunkId() != 0) {
+ if (subscription != null) {
+ log.warn("[{}][{}] Skip the message because of it
not the first chunk."
+ + " Position: {}, UUID: {},
ChunkId: {}, NumChunksFromMsg: {}",
+ subscription.getTopicName(),
subscription.getName(), entryAndMetadata.getPosition(),
+ metadata.getUuid(), metadata.getChunkId(),
metadata.getNumChunksFromMsg());
+ // Directly ack the message
+ if (!(subscription instanceof
PulsarCompactorSubscription)) {
+
subscription.acknowledgeMessage(Collections.singletonList(
+ entryAndMetadata.getPosition()),
AckType.Individual, Collections.emptyMap());
+ entryAndMetadata.release();
+ }
+ }
+ }
+ consumerForUuid = consumer;
+ uuidToConsumer.put(uuid, consumerForUuid);
+ }
+
+ final int permits =
consumerToPermits.computeIfAbsent(consumerForUuid,
Consumer::getAvailablePermits);
+ if (metadata.getChunkId() == metadata.getNumChunksFromMsg() -
1) {
+ // The last chunk is received, we should remove the uuid
+ uuidToConsumer.remove(uuid);
}
+
consumerToEntries.computeIfAbsent(consumerForUuid, __ -> new
ArrayList<>()).add(entryAndMetadata);
Review Comment:
When a non-first chunk is acknowledged and released (lines 104-106), the
message should not continue to be processed. However, the code proceeds to add
this already-released message to consumerToEntries (line 120) and cache its
UUID mapping (line 111). This will cause the consumer to receive a message
whose buffer has already been released, leading to potential reference counting
errors or access to freed memory. After acknowledging and releasing the
message, the code should continue to the next iteration of the loop without
further processing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]