Technoboy- opened a new pull request, #16918:
URL: https://github.com/apache/pulsar/pull/16918

   Cherry-pick #16917 16205
   ### Motivation
   
   ```
   06:03:58.778 [broker-topic-workers-OrderedScheduler-4-0] ERROR 
org.apache.bookkeeper.common.util.SafeRunnable - Unexpected throwable caught
   java.lang.IllegalArgumentException: null
           at 
com.google.common.base.Preconditions.checkArgument(Preconditions.java:128)
           at 
org.apache.pulsar.broker.service.persistent.CompactorSubscription.acknowledgeMessage(CompactorSubscription.java:61)
 
           at 
org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:154)
 
           at 
org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:103)
           at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.internalReadEntriesComplete(PersistentDispatcherSingleActiveConsumer.java:203)
 
           at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.lambda$readEntriesComplete$1(PersistentDispatcherSingleActiveConsumer.java:146)
 
           at 
org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) 
           at 
org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) 
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
           at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
           at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 [?:?]
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
           at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 
           at java.lang.Thread.run(Thread.java:829) [?:?]
   ```
   
   If the topic enabled transaction or replicated cluster,  the original topic 
will write some Marker msg. Then if enabled compaction, it will trigger 
CompactionSubscription to acknowledge failure due to inconsistent `AckType`:
   
   line-178 will ack as `Individual`:
   
https://github.com/apache/pulsar/blob/c871d2433acad7f6fc6fd37c32102fc7578b8c2d/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L168-L181
   
   But CompactorSubscription only supports `Cumulative `:
   
https://github.com/apache/pulsar/blob/c871d2433acad7f6fc6fd37c32102fc7578b8c2d/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java#L60-L64
   
   Above is the root cause. 
   
   Then if occurs this error, it may cause the broker OOM, because some entries 
are not released. 
   
   
   ### Modification
   
   - Check subscription type when acknowledge.
   
   ### Verifying this change
   
   - [x] Add a new test to cover this change.
   
   ### Documentation
   
   - [x] `doc-not-needed` 
   (Please explain why)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to