rseetham commented on code in PR #17669:
URL: https://github.com/apache/pinot/pull/17669#discussion_r3306665160
##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java:
##########
@@ -147,4 +163,34 @@ private Boolean fetchMultipleStreams()
}
return Boolean.TRUE;
}
+
+ /**
+ * Fetches available topic names from the stream provider.
+ * Uses the first stream config to fetch topics.
+ *
+ * @return Set of available topic names, or null if topics could not be
fetched or skip missing topics is disabled
+ */
+ private Set<String> fetchAvailableTopicNames() {
+ if (!_multitopicSkipMissingTopics || _streamConfigs.isEmpty()) {
+ return null;
+ }
+
+ StreamConfig streamConfigForTopicFetch = _streamConfigs.get(0);
+
+ String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() +
"-topicFetch-"
+ + streamConfigForTopicFetch.getTableNameWithType();
+ StreamConsumerFactory factory =
StreamConsumerFactoryProvider.create(streamConfigForTopicFetch);
+
+ try (StreamMetadataProvider provider =
factory.createStreamMetadataProvider(clientId)) {
+ return provider.getTopics().stream()
+ .map(StreamMetadataProvider.TopicMetadata::getName)
+ .collect(Collectors.toSet());
+ } catch (UnsupportedOperationException e) {
+ LOGGER.debug("getTopics() not supported for stream type, skipping topic
existence validation");
+ return null;
+ } catch (Exception e) {
Review Comment:
Even TransientConsumerException will get flagged as missing topic which is
not true. So you can't mark it as a missing topic every time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]