navina opened a new issue, #10014:
URL: https://github.com/apache/pinot/issues/10014

   Today, the realtime segment data manager creates a kafka `clientId` for 
consumer by concatenating topic name and partition id. However, this will lead 
to conflicting mbeans from kafka clients as we spin up kafka consumer across 
components [1].
    
   While this will not prevent consumption, it renders the mbeans useless and 
leads to a lot of warning logs like this:
   <details>
   <summary>Logs from controller</summary>
   
   ```
   2022/12/20 14:06:33.217 WARN [AppInfoParser] [pool-10-thread-3] Error 
registering AppInfo mbean
   javax.management.InstanceAlreadyExistsException: 
kafka.consumer:type=app-info,id=PartitionGroupMetadataFetcher-githubEvents
           at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436) 
~[?:?]
           at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
 ~[?:?]
           at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
 ~[?:?]
           at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
 ~[?:?]
           at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
 ~[?:?]
           at 
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) 
~[?:?]
           at 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
 ~[kafka-clients-2.8.1.jar:?]
           at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:814) 
~[kafka-clients-2.8.1.jar:?]
           at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665) 
~[kafka-clients-2.8.1.jar:?]
           at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646) 
~[kafka-clients-2.8.1.jar:?]
           at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:626) 
~[kafka-clients-2.8.1.jar:?]
           at 
org.apache.pinot.plugin.stream.kafka20.KafkaPartitionLevelConnectionHandler.<init>(KafkaPartitionLevelConnectionHandler.java:64)
 ~[classes/:?]
           at 
org.apache.pinot.plugin.stream.kafka20.KafkaStreamMetadataProvider.<init>(KafkaStreamMetadataProvider.java:54)
 ~[classes/:?]
           at 
org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory.createPartitionMetadataProvider(KafkaConsumerFactory.java:43)
 ~[classes/:?]
           at 
org.apache.pinot.spi.stream.StreamMetadataProvider.computePartitionGroupMetadata(StreamMetadataProvider.java:83)
 ~[classes/:?]
           at 
org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher.call(PartitionGroupMetadataFetcher.java:69)
 ~[classes/:?]
           at 
org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher.call(PartitionGroupMetadataFetcher.java:31)
 ~[classes/:?]
           at 
org.apache.pinot.spi.utils.retry.BaseRetryPolicy.attempt(BaseRetryPolicy.java:50)
 ~[classes/:?]
           at 
org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder.getPartitionGroupMetadataList(PinotTableIdealStateBuilder.java:160)
 ~[classes/:?]
           at 
org.apache.pinot.controller.helix.core.realtime.MissingConsumingSegmentFinder.<init>(MissingConsumingSegmentFinder.java:79)
 ~[classes/:?]
           at 
org.apache.pinot.controller.helix.SegmentStatusChecker.updateSegmentMetrics(SegmentStatusChecker.java:307)
 ~[classes/:?]
           at 
org.apache.pinot.controller.helix.SegmentStatusChecker.processTable(SegmentStatusChecker.java:120)
 ~[classes/:?]
           at 
org.apache.pinot.controller.helix.SegmentStatusChecker.processTable(SegmentStatusChecker.java:61)
 ~[classes/:?]
           at 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask.processTables(ControllerPeriodicTask.java:116)
 ~[classes/:?]
           at 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask.runTask(ControllerPeriodicTask.java:85)
 ~[classes/:?]
           at 
org.apache.pinot.core.periodictask.BasePeriodicTask.run(BasePeriodicTask.java:150)
 ~[classes/:?]
           at 
org.apache.pinot.core.periodictask.BasePeriodicTask.run(BasePeriodicTask.java:135)
 ~[classes/:?]
           at 
org.apache.pinot.core.periodictask.PeriodicTaskScheduler.lambda$start$0(PeriodicTaskScheduler.java:87)
 ~[classes/:?]
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
           at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) 
[?:?]
           at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
 [?:?]
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
           at java.lang.Thread.run(Thread.java:829) [?:?]
   2022/12/20 14:06:33.217 INFO [KafkaConsumer] [pool-10-thread-3] [Consumer 
clientId=PartitionGroupMetadataFetcher-githubEvents, groupId=null] Subscribed 
to partition(s): githubEvents-0
   ```
   </details>
   
   
   
   <details>
   <summary>Logs from server</summary>
   
   ```
   2022/12/19 15:24:39.465 WARN [AppInfoParser] 
[HelixTaskExecutor-message_handle_thread_57] Error registering AppInfo mbean
   javax.management.InstanceAlreadyExistsException: 
kafka.consumer:type=app-info,id=upsertMeetupRSVPEvents-0
           at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436) 
~[?:?]
           at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
 ~[?:?]
           at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
 ~[?:?]
           at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
 ~[?:?]
           at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
 ~[?:?]
           at 
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) 
~[?:?]
           at 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
 [kafka-clients-2.8.1.jar:?]
           at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:814) 
[kafka-clients-2.8.1.jar:?]
           at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665) 
[kafka-clients-2.8.1.jar:?]
           at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646) 
[kafka-clients-2.8.1.jar:?]
           at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:626) 
[kafka-clients-2.8.1.jar:?]
           at 
org.apache.pinot.plugin.stream.kafka20.KafkaPartitionLevelConnectionHandler.<init>(KafkaPartitionLevelConnectionHandler.java:64)
 [classes/:?]
           at 
org.apache.pinot.plugin.stream.kafka20.KafkaStreamMetadataProvider.<init>(KafkaStreamMetadataProvider.java:54)
 [classes/:?]
           at 
org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory.createPartitionMetadataProvider(KafkaConsumerFactory.java:43)
 [classes/:?]
           at 
org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.createPartitionMetadataProvider(LLRealtimeSegmentDataManager.java:1584)
 [classes/:?]
           at 
org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.<init>(LLRealtimeSegmentDataManager.java:1437)
 [classes/:?]
           at 
org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.addSegment(RealtimeTableDataManager.java:355)
 [classes/:?]
           at 
org.apache.pinot.server.starter.helix.HelixInstanceDataManager.addRealtimeSegment(HelixInstanceDataManager.java:178)
 [classes/:?]
           at 
org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline(SegmentOnlineOfflineStateModelFactory.java:161)
 [classes/:?]
           at 
org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline(SegmentOnlineOfflineStateModelFactory.java:83)
 [classes/:?]
           at jdk.internal.reflect.GeneratedMethodAccessor483.invoke(Unknown 
Source) ~[?:?]
           at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:?]
           at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
           at 
org.apache.helix.messaging.handling.HelixStateTransitionHandler.invoke(HelixStateTransitionHandler.java:350)
 [helix-core-1.0.4.jar:1.0.4]
           at 
org.apache.helix.messaging.handling.HelixStateTransitionHandler.handleMessage(HelixStateTransitionHandler.java:278)
 [helix-core-1.0.4.jar:1.0.4]
           at 
org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:97) 
[helix-core-1.0.4.jar:1.0.4]
           at 
org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:49) 
[helix-core-1.0.4.jar:1.0.4]
           at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
           at java.lang.Thread.run(Thread.java:829) [?:?]
   
   ```
   </details>
   
   It is breaking a fundamental assumption made by kafka clients that every 
instance has a unique identifier denoted by `client.id`. Fixing this will allow 
users to leverage the useful metrics exposed by kafka clients. 
   
   
   [1] - there is scope for improvement here. The Kafka connection handler 
should spin up a Kafka Admin client instead of using the consumer client for 
fetching metadata. Moreover, since the `KafkaStreamMetadataProvider` "extends" 
the connection handler, many short-lived consumers get created. This can add 
undue burden on resources. Will create another issue to handle this. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to