Repository: incubator-gearpump Updated Branches: refs/heads/master 529799cc4 -> 3c0ebb13f
[GEARPUMP-197] fix busy loop in FetchThread Author: manuzhang <[email protected]> Closes #77 from manuzhang/fix_fetch_thread. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/3c0ebb13 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/3c0ebb13 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/3c0ebb13 Branch: refs/heads/master Commit: 3c0ebb13f64ee6b64623668dec6d3923d6f09cf6 Parents: 529799c Author: manuzhang <[email protected]> Authored: Tue Aug 30 10:24:24 2016 +0800 Committer: manuzhang <[email protected]> Committed: Tue Aug 30 10:24:24 2016 +0800 ---------------------------------------------------------------------- .../gearpump/streaming/kafka/util/KafkaConfig.java | 3 +-- .../kafka/lib/source/consumer/FetchThread.scala | 16 +++++++++------- 2 files changed, 10 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/3c0ebb13/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java index 8c931cd..451faec 100644 --- a/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java +++ b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java @@ -155,7 +155,7 @@ public class KafkaConfig extends AbstractConfig implements Serializable { CONSUMER_START_OFFSET_DOC) .define(ENABLE_AUTO_COMMIT_CONFIG, ConfigDef.Type.BOOLEAN, - true, + false, ConfigDef.Importance.MEDIUM, ENABLE_AUTO_COMMIT_DOC) .define(CHECKPOINT_STORE_NAME_PREFIX_CONFIG, @@ -209,7 +209,6 @@ public class KafkaConfig extends AbstractConfig implements Serializable { if (!props.containsKey(GROUP_ID_CONFIG)) { props.put(GROUP_ID_CONFIG, getString(GROUP_ID_CONFIG)); } - props.put(ENABLE_AUTO_COMMIT_CONFIG, "false"); return new ConsumerConfig(props); } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/3c0ebb13/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala index 3119f40..49c116c 100644 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala @@ -116,9 +116,9 @@ private[kafka] class FetchThread( resetConsumers(nextOffsets) reset = false } - val hasMoreMessages = fetchMessage + val fetchMore: Boolean = fetchMessage sleeper.reset() - if (!hasMoreMessages) { + if (!fetchMore) { // sleep for given duration sleeper.sleep(fetchSleepMS) } @@ -133,19 +133,21 @@ private[kafka] class FetchThread( /** * fetch message from each TopicAndPartition in a round-robin way + * + * @return whether to fetch more messages */ private def fetchMessage: Boolean = { - consumers.foldLeft(false) { (hasNext, tpAndConsumer) => - val (_, consumer) = tpAndConsumer - if (incomingQueue.size < fetchThreshold) { + if (incomingQueue.size >= fetchThreshold) { + false + } else { + consumers.foldLeft(false) { (hasNext, tpAndConsumer) => + val (_, consumer) = tpAndConsumer if (consumer.hasNext) { incomingQueue.put(consumer.next()) true } else { hasNext } - } else { - true } } }
