Repository: kafka Updated Branches: refs/heads/trunk 28ecea421 -> 478505632
kafka-2264; SESSION_TIMEOUT_MS_CONFIG in ConsumerConfig should be int; patched by Manikumar Reddy; reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7009f1d6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7009f1d6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7009f1d6 Branch: refs/heads/trunk Commit: 7009f1d6fffe3866723d1d33a28a4572053eb4e5 Parents: 28ecea4 Author: Manikumar Reddy <manikumar.re...@gmail.com> Authored: Tue Jun 16 15:30:52 2015 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Tue Jun 16 15:30:52 2015 -0700 ---------------------------------------------------------------------- .../java/org/apache/kafka/clients/consumer/ConsumerConfig.java | 2 +- .../java/org/apache/kafka/clients/consumer/KafkaConsumer.java | 2 +- .../apache/kafka/clients/consumer/internals/Coordinator.java | 6 +++--- .../kafka/clients/consumer/internals/CoordinatorTest.java | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/7009f1d6/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 1e90524..daff34d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -167,7 +167,7 @@ public class ConsumerConfig extends AbstractConfig { CommonClientConfigs.BOOSTRAP_SERVERS_DOC) .define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC) .define(SESSION_TIMEOUT_MS_CONFIG, - Type.LONG, + Type.INT, 30000, Importance.HIGH, SESSION_TIMEOUT_MS_DOC) http://git-wip-us.apache.org/repos/asf/kafka/blob/7009f1d6/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index d1d1ec1..951c34c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -484,7 +484,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { this.coordinator = new Coordinator(this.client, config.getString(ConsumerConfig.GROUP_ID_CONFIG), this.retryBackoffMs, - config.getLong(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), + config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), config.getString(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), this.metadata, this.subscriptions, http://git-wip-us.apache.org/repos/asf/kafka/blob/7009f1d6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java index c1496a0..41cb945 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java @@ -69,7 +69,7 @@ public final class Coordinator { private final String groupId; private final Metadata metadata; private final Heartbeat heartbeat; - private final long sessionTimeoutMs; + private final int sessionTimeoutMs; private final String assignmentStrategy; private final SubscriptionState subscriptions; private final CoordinatorMetrics sensors; @@ -84,7 +84,7 @@ public final class Coordinator { public Coordinator(KafkaClient client, String groupId, long retryBackoffMs, - long sessionTimeoutMs, + int sessionTimeoutMs, String assignmentStrategy, Metadata metadata, SubscriptionState subscriptions, @@ -123,7 +123,7 @@ public final class Coordinator { // repeat processing the response until succeed or fatal error do { JoinGroupRequest request = new JoinGroupRequest(groupId, - (int) this.sessionTimeoutMs, + this.sessionTimeoutMs, subscribedTopics, this.consumerId, this.assignmentStrategy); http://git-wip-us.apache.org/repos/asf/kafka/blob/7009f1d6/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java index b06c4a7..1454ab7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java @@ -50,7 +50,7 @@ public class CoordinatorTest { private String groupId = "test-group"; private TopicPartition tp = new TopicPartition(topicName, 0); private long retryBackoffMs = 0L; - private long sessionTimeoutMs = 10L; + private int sessionTimeoutMs = 10; private String rebalanceStrategy = "not-matter"; private MockTime time = new MockTime(); private MockClient client = new MockClient(time);