Jackie-Jiang commented on code in PR #15978:
URL: https://github.com/apache/pinot/pull/15978#discussion_r2127486021
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1956,11 +1968,32 @@ private void recreateStreamConsumer(String reason) {
/**
* Creates a new stream metadata provider
*/
- private void createPartitionMetadataProvider(String reason) {
- closePartitionMetadataProvider();
- _segmentLogger.info("Creating new partition metadata provider, reason:
{}", reason);
- _partitionMetadataProvider =
_streamConsumerFactory.createPartitionMetadataProvider(
- _clientId, _streamPatitionGroupId);
+ private StreamMetadataProvider getOrCreatePartitionMetadataProvider(String
reason) {
+ if (_streamConsumerClosed.get()) {
Review Comment:
This can lead to race condition
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1956,11 +1968,32 @@ private void recreateStreamConsumer(String reason) {
/**
* Creates a new stream metadata provider
*/
- private void createPartitionMetadataProvider(String reason) {
- closePartitionMetadataProvider();
- _segmentLogger.info("Creating new partition metadata provider, reason:
{}", reason);
- _partitionMetadataProvider =
_streamConsumerFactory.createPartitionMetadataProvider(
- _clientId, _streamPatitionGroupId);
+ private StreamMetadataProvider getOrCreatePartitionMetadataProvider(String
reason) {
+ if (_streamConsumerClosed.get()) {
+ _segmentLogger.debug("Skip creating metadata provider – stream is closed
({})", reason);
+ return null;
+ }
+
+ StreamMetadataProvider provider = _partitionMetadataProvider.get();
+ if (provider != null) {
+ return provider;
+ }
+
+ // Try to create one – only the thread that wins the CAS installs it.
+ StreamMetadataProvider newProvider =
+ _streamConsumerFactory.createPartitionMetadataProvider(_clientId,
_streamPatitionGroupId);
Review Comment:
Will it cause exception/warning when 2 threads trying to create it with the
same client id? If so, we can just synchronize on `_partitionMetadataProvider`
to ensure only one thread can create it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]