hangc0276 opened a new pull request #9493:
URL: https://github.com/apache/pulsar/pull/9493
### Motivation
When configure broker.conf with
`brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor`
and run KOP with Pulsar Broker, we get the following exception:
```
16:44:20.374 [pulsar-msg-expiry-monitor-27-1] WARN
org.apache.pulsar.broker.service.persistent.PersistentTopic -
[persistent://public/__kafka/__consumer_offsets-partition-36] Error while
getting the oldest mess
age
java.lang.IllegalStateException: Field 'broker_timestamp' is not set
at
org.apache.pulsar.common.api.proto.BrokerEntryMetadata.getBrokerTimestamp(BrokerEntryMetadata.java:14)
~[org.apache.pulsar-pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
at
org.apache.pulsar.client.impl.MessageImpl.isExpired(MessageImpl.java:299)
~[org.apache.pulsar-pulsar-client-original-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
at
org.apache.pulsar.broker.service.persistent.PersistentTopic.isOldestMessageExpired(PersistentTopic.java:2193)
~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
at
org.apache.pulsar.broker.service.persistent.PersistentSubscription.expireMessages(PersistentSubscription.java:891)
~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
at
org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$checkMessageExpiry$34(PersistentTopic.java:1245)
~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
at java.util.ArrayList.forEach(ArrayList.java:1257) ~[?:1.8.0_172]
at
org.apache.pulsar.broker.service.persistent.PersistentTopic.checkMessageExpiry(PersistentTopic.java:1245)
~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_172]
at
org.apache.pulsar.broker.service.BrokerService.lambda$forEachTopic$51(BrokerService.java:1472)
~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:387)
~[org.apache.pulsar-pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
~[org.apache.pulsar-pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
at
org.apache.pulsar.broker.service.BrokerService.forEachTopic(BrokerService.java:1470)
~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
at
org.apache.pulsar.broker.service.BrokerService.checkMessageExpiry(BrokerService.java:1411)
~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
at
org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
[org.apache.pulsar-managed-ledger-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
at
org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
[org.apache.bookkeeper-bookkeeper-common-4.12.1.jar:4.12.1]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_172]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
[?:1.8.0_172]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
[?:1.8.0_172]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
[?:1.8.0_172]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_172]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_172]
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
[io.netty-netty-common-4.1.51.Final.jar:4.1.51.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172]
```
The reason is that it doesn't check whether `brokerTimestamp` has been set
in brokerEntryMetadata, which is set by `
org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor`
```Java
public boolean isExpired(int messageTTLInSeconds) {
return messageTTLInSeconds != 0 && (brokerEntryMetadata == null
? (System.currentTimeMillis() >
getPublishTime() +
TimeUnit.SECONDS.toMillis(messageTTLInSeconds))
: (System.currentTimeMillis() >
brokerEntryMetadata.getBrokerTimestamp() +
TimeUnit.SECONDS.toMillis(messageTTLInSeconds)));
}
```
### Modification
1. check whether brokerTimestamp has been set before call
`getBrokerTimestamp` method.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]