umustafi commented on code in PR #3570:
URL: https://github.com/apache/gobblin/pull/3570#discussion_r983919763
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java:
##########
@@ -74,7 +76,15 @@ public String load(String key) throws Exception {
protected GobblinServiceJobScheduler scheduler;
public SpecStoreChangeMonitor(String topic, Config config, int numThreads) {
- super(topic, config, numThreads);
+ // Differentiate group id for each host
+ super(topic, config.withValue(GROUP_ID_KEY,
+ ConfigValueFactory.fromAnyRef(SPEC_STORE_CHANGE_MONITOR_PREFIX +
UUID.randomUUID().toString())),
+ numThreads);
+ }
+
+ @Override
+ protected void initializeConsumerClient() {
Review Comment:
I can add that comment to explain it but it is difficult to move that logic
here as I need access to the underlying kafkaConsumer itself to get the
Partition Info for a particular kafka topic and then convert it to the list of
kafka partitions I need to set it to. Also after the `ConsumerClient` is
initialized the topic is still not set and would still require modifications to
the `GobblinKafkaConsumerClient` interface to extract partitions. That logic is
more easily managed in the consumer client itself.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]