umustafi commented on code in PR #3570:
URL: https://github.com/apache/gobblin/pull/3570#discussion_r983919763
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java:
##########
@@ -74,7 +76,15 @@ public String load(String key) throws Exception {
protected GobblinServiceJobScheduler scheduler;
public SpecStoreChangeMonitor(String topic, Config config, int numThreads) {
- super(topic, config, numThreads);
+ // Differentiate group id for each host
+ super(topic, config.withValue(GROUP_ID_KEY,
+ ConfigValueFactory.fromAnyRef(SPEC_STORE_CHANGE_MONITOR_PREFIX +
UUID.randomUUID().toString())),
+ numThreads);
+ }
+
+ @Override
+ protected void initializeConsumerClient() {
Review Comment:
I can add that comment to explain it but it is difficult to move that logic
here as I need access to the underlying kafkaConsumer itself to get the
Partition Info for a particular kafka topic and then convert it to the list of
kafka partitions I need to set it to. That logic is more easily managed in the
consumer client itself.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]