xiangfu0 opened a new pull request, #16623: URL: https://github.com/apache/pinot/pull/16623
## Problem PR #15393 introduced unique client IDs to resolve JMX conflicts, but this created too many open Kafka connections/ports for Pinot servers, leading to resource exhaustion on high-traffic clusters. ## Solution Implements comprehensive caching for StreamMetadataProvider instances that: - **Reuses connections** while maintaining unique client ID benefits - **Prevents orphan providers** with explicit cleanup on recreation - **Ensures thread safety** with synchronized operations - **Provides graceful shutdown** with automatic cleanup hooks ## Key Features ### 🔧 StreamMetadataProviderCacheManager - Singleton cache manager with separate caches for stream and partition providers - Configurable cache expiry (30 minutes) and size limits (1000 entries) - Automatic cleanup using Guava RemovalListeners - Base client ID extraction for proper caching despite unique suffixes ### 🔄 Enhanced StreamConsumerFactory - `createCachedStreamMetadataProvider()` - Get cached or create new stream provider - `createCachedPartitionMetadataProvider()` - Get cached or create new partition provider - `recreateCachedStreamMetadataProvider()` - Force recreation with explicit cleanup - `recreateCachedPartitionMetadataProvider()` - Force recreation with explicit cleanup ### 🛡️ Orphan Prevention - **Explicit old provider cleanup** before creating new ones during recreation - **Synchronized recreation methods** to prevent race conditions - **Enhanced clearAll()** that closes all providers before clearing cache - **Shutdown hooks** for graceful cleanup on application termination ### 📊 Updated Client Classes - PartitionGroupMetadataFetcher - PinotLLCRealtimeSegmentManager - RealtimeConsumptionRateManager - StreamMetadataProvider.computePartitionGroupMetadata ## Testing Includes comprehensive unit tests: - Cache reuse verification - Orphan prevention validation - Recreation cleanup testing - Thread safety verification ## Benefits ✅ **Reduced Port Usage**: Reuses existing connections instead of creating new ones ✅ **Better Performance**: Cached connections avoid establishment overhead ✅ **Resource Efficiency**: Prevents resource exhaustion on high-traffic clusters ✅ **Maintained Reliability**: Still recreates connections on failures ✅ **Backward Compatibility**: Original methods continue to work ## Impact Resolves port exhaustion while maintaining all benefits of unique client IDs from PR #15393. Production-ready with comprehensive error handling and monitoring. **Related**: Addresses issues introduced in #15393 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
