Repository: incubator-gobblin Updated Branches: refs/heads/master 6198120e3 -> 43d5ed520
[GOBBLIN-308] Change boot sequence of gobblin cluster to fix the hanging issue Closes #2162 from yukuai518/cluster-stuck Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/43d5ed52 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/43d5ed52 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/43d5ed52 Branch: refs/heads/master Commit: 43d5ed5204930f4b7c68d5171adf572b09ed1413 Parents: 6198120 Author: Kuai Yu <[email protected]> Authored: Tue Nov 7 16:35:03 2017 -0800 Committer: Hung Tran <[email protected]> Committed: Tue Nov 7 16:35:03 2017 -0800 ---------------------------------------------------------------------- .../cluster/StreamingJobConfigurationManager.java | 14 +++++++++----- .../gobblin/service/StreamingKafkaSpecConsumer.java | 10 ++++++---- 2 files changed, 15 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/43d5ed52/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java index 7370dc6..849dd6a 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java @@ -95,11 +95,6 @@ public class StreamingJobConfigurationManager extends JobConfigurationManager { protected void startUp() throws Exception { LOGGER.info("Starting the " + StreamingJobConfigurationManager.class.getSimpleName()); - // if the instance consumer is a service then need to start it to consume job specs - if (this.specConsumer instanceof Service) { - ((Service) this.specConsumer).startAsync().awaitRunning(); - } - // submit command to fetch job specs this.fetchJobSpecExecutor.execute(new Runnable() { @Override @@ -116,6 +111,15 @@ public class StreamingJobConfigurationManager extends JobConfigurationManager { } } }); + + // if the instance consumer is a service then need to start it to consume job specs + // IMPORTANT: StreamingKafkaSpecConsumer needs to be launched after a fetching thread is created. + // This is because StreamingKafkaSpecConsumer will invoke addListener(new JobSpecListener()) during startup, + // which will push job specs into a blocking queue _jobSpecQueue. A fetching thread will help to consume the + // blocking queue to prevent a hanging issue. + if (this.specConsumer instanceof Service) { + ((Service) this.specConsumer).startAsync().awaitRunning(); + } } private void fetchJobSpecs() throws ExecutionException, InterruptedException { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/43d5ed52/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java index 7d7b702..23966e9 100644 --- a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java +++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java @@ -61,7 +61,7 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S private static final int DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE = 100; private final AvroJobSpecKafkaJobMonitor _jobMonitor; private final BlockingQueue<ImmutablePair<SpecExecutor.Verb, Spec>> _jobSpecQueue; - + private final MutableJobCatalog _jobCatalog; public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog, Optional<Logger> log) { String topic = config.getString(SPEC_KAFKA_TOPICS_KEY); Config defaults = ConfigFactory.parseMap(ImmutableMap.of(AvroJobSpecKafkaJobMonitor.TOPIC_KEY, topic, @@ -74,11 +74,9 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S throw new RuntimeException("Could not create job monitor", e); } + _jobCatalog = jobCatalog; _jobSpecQueue = new LinkedBlockingQueue<>(ConfigUtils.getInt(config, "SPEC_STREAMING_BLOCKING_QUEUE_SIZE", DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE)); - - // listener will add job specs to a blocking queue to send to callers of changedSpecs() - jobCatalog.addListener(new JobSpecListener()); } public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog, Logger log) { @@ -116,6 +114,10 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S @Override protected void startUp() { + // listener will add job specs to a blocking queue to send to callers of changedSpecs() + // IMPORTANT: This addListener should be invoked after job catalog has been initialized. This is guaranteed because + // StreamingKafkaSpecConsumer is boot after jobCatalog in GobblinClusterManager::startAppLauncherAndServices() + _jobCatalog.addListener(new JobSpecListener()); _jobMonitor.startAsync().awaitRunning(); }
