navina opened a new issue, #10014:
URL: https://github.com/apache/pinot/issues/10014
Today, the realtime segment data manager creates a kafka `clientId` for
consumer by concatenating topic name and partition id. However, this will lead
to conflicting mbeans from kafka clients as we spin up kafka consumer across
components [1].
While this will not prevent consumption, it renders the mbeans useless and
leads to a lot of warning logs like this:
<details>
<summary>Logs from controller</summary>
```
2022/12/20 14:06:33.217 WARN [AppInfoParser] [pool-10-thread-3] Error
registering AppInfo mbean
javax.management.InstanceAlreadyExistsException:
kafka.consumer:type=app-info,id=PartitionGroupMetadataFetcher-githubEvents
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)
~[?:?]
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
~[?:?]
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
~[?:?]
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
~[?:?]
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
~[?:?]
at
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
~[?:?]
at
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
~[kafka-clients-2.8.1.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:814)
~[kafka-clients-2.8.1.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665)
~[kafka-clients-2.8.1.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646)
~[kafka-clients-2.8.1.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:626)
~[kafka-clients-2.8.1.jar:?]
at
org.apache.pinot.plugin.stream.kafka20.KafkaPartitionLevelConnectionHandler.<init>(KafkaPartitionLevelConnectionHandler.java:64)
~[classes/:?]
at
org.apache.pinot.plugin.stream.kafka20.KafkaStreamMetadataProvider.<init>(KafkaStreamMetadataProvider.java:54)
~[classes/:?]
at
org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory.createPartitionMetadataProvider(KafkaConsumerFactory.java:43)
~[classes/:?]
at
org.apache.pinot.spi.stream.StreamMetadataProvider.computePartitionGroupMetadata(StreamMetadataProvider.java:83)
~[classes/:?]
at
org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher.call(PartitionGroupMetadataFetcher.java:69)
~[classes/:?]
at
org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher.call(PartitionGroupMetadataFetcher.java:31)
~[classes/:?]
at
org.apache.pinot.spi.utils.retry.BaseRetryPolicy.attempt(BaseRetryPolicy.java:50)
~[classes/:?]
at
org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder.getPartitionGroupMetadataList(PinotTableIdealStateBuilder.java:160)
~[classes/:?]
at
org.apache.pinot.controller.helix.core.realtime.MissingConsumingSegmentFinder.<init>(MissingConsumingSegmentFinder.java:79)
~[classes/:?]
at
org.apache.pinot.controller.helix.SegmentStatusChecker.updateSegmentMetrics(SegmentStatusChecker.java:307)
~[classes/:?]
at
org.apache.pinot.controller.helix.SegmentStatusChecker.processTable(SegmentStatusChecker.java:120)
~[classes/:?]
at
org.apache.pinot.controller.helix.SegmentStatusChecker.processTable(SegmentStatusChecker.java:61)
~[classes/:?]
at
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask.processTables(ControllerPeriodicTask.java:116)
~[classes/:?]
at
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask.runTask(ControllerPeriodicTask.java:85)
~[classes/:?]
at
org.apache.pinot.core.periodictask.BasePeriodicTask.run(BasePeriodicTask.java:150)
~[classes/:?]
at
org.apache.pinot.core.periodictask.BasePeriodicTask.run(BasePeriodicTask.java:135)
~[classes/:?]
at
org.apache.pinot.core.periodictask.PeriodicTaskScheduler.lambda$start$0(PeriodicTaskScheduler.java:87)
~[classes/:?]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
[?:?]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
[?:?]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[?:?]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
2022/12/20 14:06:33.217 INFO [KafkaConsumer] [pool-10-thread-3] [Consumer
clientId=PartitionGroupMetadataFetcher-githubEvents, groupId=null] Subscribed
to partition(s): githubEvents-0
```
</details>
<details>
<summary>Logs from server</summary>
```
2022/12/19 15:24:39.465 WARN [AppInfoParser]
[HelixTaskExecutor-message_handle_thread_57] Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException:
kafka.consumer:type=app-info,id=upsertMeetupRSVPEvents-0
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)
~[?:?]
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
~[?:?]
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
~[?:?]
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
~[?:?]
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
~[?:?]
at
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
~[?:?]
at
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
[kafka-clients-2.8.1.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:814)
[kafka-clients-2.8.1.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665)
[kafka-clients-2.8.1.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646)
[kafka-clients-2.8.1.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:626)
[kafka-clients-2.8.1.jar:?]
at
org.apache.pinot.plugin.stream.kafka20.KafkaPartitionLevelConnectionHandler.<init>(KafkaPartitionLevelConnectionHandler.java:64)
[classes/:?]
at
org.apache.pinot.plugin.stream.kafka20.KafkaStreamMetadataProvider.<init>(KafkaStreamMetadataProvider.java:54)
[classes/:?]
at
org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory.createPartitionMetadataProvider(KafkaConsumerFactory.java:43)
[classes/:?]
at
org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.createPartitionMetadataProvider(LLRealtimeSegmentDataManager.java:1584)
[classes/:?]
at
org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.<init>(LLRealtimeSegmentDataManager.java:1437)
[classes/:?]
at
org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.addSegment(RealtimeTableDataManager.java:355)
[classes/:?]
at
org.apache.pinot.server.starter.helix.HelixInstanceDataManager.addRealtimeSegment(HelixInstanceDataManager.java:178)
[classes/:?]
at
org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline(SegmentOnlineOfflineStateModelFactory.java:161)
[classes/:?]
at
org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline(SegmentOnlineOfflineStateModelFactory.java:83)
[classes/:?]
at jdk.internal.reflect.GeneratedMethodAccessor483.invoke(Unknown
Source) ~[?:?]
at
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at
org.apache.helix.messaging.handling.HelixStateTransitionHandler.invoke(HelixStateTransitionHandler.java:350)
[helix-core-1.0.4.jar:1.0.4]
at
org.apache.helix.messaging.handling.HelixStateTransitionHandler.handleMessage(HelixStateTransitionHandler.java:278)
[helix-core-1.0.4.jar:1.0.4]
at
org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:97)
[helix-core-1.0.4.jar:1.0.4]
at
org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:49)
[helix-core-1.0.4.jar:1.0.4]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[?:?]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
```
</details>
It is breaking a fundamental assumption made by kafka clients that every
instance has a unique identifier denoted by `client.id`. Fixing this will allow
users to leverage the useful metrics exposed by kafka clients.
[1] - there is scope for improvement here. The Kafka connection handler
should spin up a Kafka Admin client instead of using the consumer client for
fetching metadata. Moreover, since the `KafkaStreamMetadataProvider` "extends"
the connection handler, many short-lived consumers get created. This can add
undue burden on resources. Will create another issue to handle this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]