lhotari commented on code in PR #25120:
URL: https://github.com/apache/pulsar/pull/25120#discussion_r2765240365


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java:
##########
@@ -106,6 +105,64 @@ public Map<Consumer, List<EntryAndMetadata>> assign(final 
List<EntryAndMetadata>
         return consumerToEntries;
     }
 
+    private int assignChunk(EntryAndMetadata entryAndMetadata, MessageMetadata 
metadata, Consumer consumer,
+            Map<Consumer, List<EntryAndMetadata>> consumerToEntries, int 
availablePermits) {
+        final String uuid = metadata.getUuid();
+        Consumer consumerForUuid = uuidToConsumer.get(uuid);
+        if (consumerForUuid == null) {
+            if (skipChunk(entryAndMetadata, metadata)) {
+                return availablePermits;
+            }
+            consumerForUuid = consumer;
+            uuidToConsumer.put(uuid, consumerForUuid);
+        }
+
+        final int permits = consumerToPermits.computeIfAbsent(consumerForUuid, 
Consumer::getAvailablePermits);
+        if (permits <= 0) {
+            unassignedMessageProcessor.accept(entryAndMetadata);
+            return availablePermits;
+        }
+        if (metadata.getChunkId() == metadata.getNumChunksFromMsg() - 1) {
+            // The last chunk is received, we should remove the uuid from the 
cache.
+            uuidToConsumer.remove(uuid);
+        }
+
+        addEntry(consumerToEntries, consumerForUuid, entryAndMetadata);
+        consumerToPermits.put(consumerForUuid, permits - 1);
+        if (consumerForUuid == consumer) {
+            return availablePermits - 1;
+        }
+        return availablePermits;
+    }
+
+    private boolean isChunkedMessage(MessageMetadata metadata) {
+        return metadata != null && metadata.hasUuid() && metadata.hasChunkId() 
&& metadata.hasNumChunksFromMsg();
+    }
+
+    private void addEntry(Map<Consumer, List<EntryAndMetadata>> 
consumerToEntries, Consumer consumer,
+            EntryAndMetadata entry) {
+        consumerToEntries.computeIfAbsent(consumer, __ -> new 
ArrayList<>()).add(entry);
+    }
+
+    private boolean skipChunk(EntryAndMetadata entryAndMetadata, 
MessageMetadata metadata) {
+        if (metadata.getChunkId() != 0) {
+            if (subscription != null) {
+                log.warn("[{}][{}] Skip the message because it is not the 
first chunk."
+                                + " Position: {}, UUID: {}, ChunkId: {}, 
NumChunksFromMsg: {}",
+                        subscription.getTopicName(), subscription.getName(), 
entryAndMetadata.getPosition(),
+                        metadata.getUuid(), metadata.getChunkId(), 
metadata.getNumChunksFromMsg());
+                // Directly ack the message.
+                if (!(subscription instanceof PulsarCompactorSubscription)) {
+                    subscription.acknowledgeMessage(Collections.singletonList(
+                            entryAndMetadata.getPosition()), 
AckType.Individual, Collections.emptyMap());
+                }
+            }
+            entryAndMetadata.release();
+            return true;

Review Comment:
   Since there's a potential for data loss, acknowledging the messages should 
be active only if `autoSkipNonRecoverableData` is set in `broker.conf`. The log 
message should be logged with `ERROR` level when `autoSkipNonRecoverableData` 
isn't set and the message shouldn't get acknowledged. I think it's fine to skip 
the message in that case so that processing the subscription continues, but 
there will be a backlog left behind due to the unacked messages.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java:
##########
@@ -106,6 +105,64 @@ public Map<Consumer, List<EntryAndMetadata>> assign(final 
List<EntryAndMetadata>
         return consumerToEntries;
     }
 
+    private int assignChunk(EntryAndMetadata entryAndMetadata, MessageMetadata 
metadata, Consumer consumer,
+            Map<Consumer, List<EntryAndMetadata>> consumerToEntries, int 
availablePermits) {
+        final String uuid = metadata.getUuid();
+        Consumer consumerForUuid = uuidToConsumer.get(uuid);
+        if (consumerForUuid == null) {
+            if (skipChunk(entryAndMetadata, metadata)) {
+                return availablePermits;
+            }
+            consumerForUuid = consumer;
+            uuidToConsumer.put(uuid, consumerForUuid);
+        }
+
+        final int permits = consumerToPermits.computeIfAbsent(consumerForUuid, 
Consumer::getAvailablePermits);
+        if (permits <= 0) {
+            unassignedMessageProcessor.accept(entryAndMetadata);
+            return availablePermits;
+        }
+        if (metadata.getChunkId() == metadata.getNumChunksFromMsg() - 1) {
+            // The last chunk is received, we should remove the uuid from the 
cache.
+            uuidToConsumer.remove(uuid);
+        }
+
+        addEntry(consumerToEntries, consumerForUuid, entryAndMetadata);
+        consumerToPermits.put(consumerForUuid, permits - 1);
+        if (consumerForUuid == consumer) {
+            return availablePermits - 1;
+        }
+        return availablePermits;
+    }
+
+    private boolean isChunkedMessage(MessageMetadata metadata) {
+        return metadata != null && metadata.hasUuid() && metadata.hasChunkId() 
&& metadata.hasNumChunksFromMsg();
+    }
+
+    private void addEntry(Map<Consumer, List<EntryAndMetadata>> 
consumerToEntries, Consumer consumer,
+            EntryAndMetadata entry) {
+        consumerToEntries.computeIfAbsent(consumer, __ -> new 
ArrayList<>()).add(entry);
+    }
+
+    private boolean skipChunk(EntryAndMetadata entryAndMetadata, 
MessageMetadata metadata) {

Review Comment:
   `skipChunk` sounds like a command, it's better to rename it to make it sound 
like a query.
   
   ```suggestion
       private boolean shouldSkipChunk(EntryAndMetadata entryAndMetadata, 
MessageMetadata metadata) {
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java:
##########
@@ -106,6 +105,64 @@ public Map<Consumer, List<EntryAndMetadata>> assign(final 
List<EntryAndMetadata>
         return consumerToEntries;
     }
 
+    private int assignChunk(EntryAndMetadata entryAndMetadata, MessageMetadata 
metadata, Consumer consumer,
+            Map<Consumer, List<EntryAndMetadata>> consumerToEntries, int 
availablePermits) {
+        final String uuid = metadata.getUuid();
+        Consumer consumerForUuid = uuidToConsumer.get(uuid);
+        if (consumerForUuid == null) {
+            if (skipChunk(entryAndMetadata, metadata)) {
+                return availablePermits;
+            }
+            consumerForUuid = consumer;
+            uuidToConsumer.put(uuid, consumerForUuid);
+        }

Review Comment:
   It seems that this solution would only skip the first entry of possibly 
multiple chunk entries. 
   Let's say if entry with chunkId 0 got lost and there would be subsequent 
entries chunkId 1, chunkId 2 and chunkId 3. The entries with chunkId 2 and 
chunkId 3 would get delivered to the client, causing a similar issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to