[
https://issues.apache.org/jira/browse/KAFKA-5760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Julius updated KAFKA-5760:
--------------------------
Description:
When I poll the message, I set the timeout, but it has been blocked in the poll
method,My kafka Cluster is normal, the test topic has 30 partition.
Through debugg found, has been doing ConsumerNetworkClient.poll method:
/**
* Block indefinitely until the given request future has finished.
* @param future The request future to await.
* @throws WakeupException if {@link #wakeup()} is called from another
thread
* @throws InterruptException if the calling thread is interrupted
*/
public void poll(RequestFuture<?> future) {
while (!future.isDone())
poll(MAX_POLL_TIMEOUT_MS, time.milliseconds(), future);
}
I don't know reason, trouble you to confirm, thank you。
Java Code:
public static Properties getProperties(String groupId, String
brokerList) {
Properties properties = new Properties();
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
brokerList);
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
OffsetResetStrategy.EARLIEST.name().toLowerCase());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringDeserializer.class.getName());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
String.valueOf(false));
return properties;
}
public static void main(String[] args) throws InterruptedException,
JMSException {
List<KafkaConsumer<String, String>> kafkaConsumers = new
ArrayList<>();
for(int i = 0 ; i < 10 ; i++){
Properties props =
getProperties("34620a79-f68b-4129-88f1-0e96bb14194e", "*.*.*.*:9092");
KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test"));
kafkaConsumers.add(consumer);
}
ExecutorService executorService =
Executors.newFixedThreadPool(3);
for (int i = 0; i < 30; i++) {
final KafkaConsumer<String, String> consumer =
kafkaConsumers.get(i % kafkaConsumers.size());
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " start get message ,
consumer: " + consumer.toString());
System.out.println(consumer.assignment());
ConsumerRecords<String, String> records
= consumer.poll(200);
System.out.println(Thread.currentThread().getName() + " poll end, consumer: "
+ consumer.toString());
for (ConsumerRecord<String, String>
record : records) {
System.out.printf("offset = %d,
pattition = %d, key = %s, value = %s", record.offset(),record.partition(),
record.key(), record.value());
System.out.println(Thread.currentThread().getName());
consumer.commitSync();
try {
Thread.sleep(2000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}
}
was:
When I poll the message, I set the timeout, but it has been blocked in the poll
method,My kafka Cluster is normal.
Through debugg found, has been doing ConsumerNetworkClient.poll method:
/**
* Block indefinitely until the given request future has finished.
* @param future The request future to await.
* @throws WakeupException if {@link #wakeup()} is called from another
thread
* @throws InterruptException if the calling thread is interrupted
*/
public void poll(RequestFuture<?> future) {
while (!future.isDone())
poll(MAX_POLL_TIMEOUT_MS, time.milliseconds(), future);
}
I don't know reason, trouble you to confirm, thank you。
Java Code:
public static Properties getProperties(String groupId, String
brokerList) {
Properties properties = new Properties();
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
brokerList);
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
OffsetResetStrategy.EARLIEST.name().toLowerCase());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringDeserializer.class.getName());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
String.valueOf(false));
return properties;
}
public static void main(String[] args) throws InterruptedException,
JMSException {
List<KafkaConsumer<String, String>> kafkaConsumers = new
ArrayList<>();
for(int i = 0 ; i < 10 ; i++){
Properties props =
getProperties("34620a79-f68b-4129-88f1-0e96bb14194e", "*.*.*.*:9092");
KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test"));
kafkaConsumers.add(consumer);
}
ExecutorService executorService =
Executors.newFixedThreadPool(3);
for (int i = 0; i < 30; i++) {
final KafkaConsumer<String, String> consumer =
kafkaConsumers.get(i % kafkaConsumers.size());
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " start get message ,
consumer: " + consumer.toString());
System.out.println(consumer.assignment());
ConsumerRecords<String, String> records
= consumer.poll(200);
System.out.println(Thread.currentThread().getName() + " poll end, consumer: "
+ consumer.toString());
for (ConsumerRecord<String, String>
record : records) {
System.out.printf("offset = %d,
pattition = %d, key = %s, value = %s", record.offset(),record.partition(),
record.key(), record.value());
System.out.println(Thread.currentThread().getName());
consumer.commitSync();
try {
Thread.sleep(2000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}
}
> The kafka Consumer can not poll the message by multi-thread
> -----------------------------------------------------------
>
> Key: KAFKA-5760
> URL: https://issues.apache.org/jira/browse/KAFKA-5760
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.11.0.0
> Reporter: Julius
>
> When I poll the message, I set the timeout, but it has been blocked in the
> poll method,My kafka Cluster is normal, the test topic has 30 partition.
> Through debugg found, has been doing ConsumerNetworkClient.poll method:
> /**
> * Block indefinitely until the given request future has finished.
> * @param future The request future to await.
> * @throws WakeupException if {@link #wakeup()} is called from another
> thread
> * @throws InterruptException if the calling thread is interrupted
> */
> public void poll(RequestFuture<?> future) {
> while (!future.isDone())
> poll(MAX_POLL_TIMEOUT_MS, time.milliseconds(), future);
> }
> I don't know reason, trouble you to confirm, thank you。
> Java Code:
> public static Properties getProperties(String groupId, String
> brokerList) {
> Properties properties = new Properties();
> properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
> properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
> brokerList);
> properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
> properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> OffsetResetStrategy.EARLIEST.name().toLowerCase());
> properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> org.apache.kafka.common.serialization.StringDeserializer.class.getName());
> properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> org.apache.kafka.common.serialization.StringDeserializer.class.getName());
> properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> String.valueOf(false));
> return properties;
> }
> public static void main(String[] args) throws InterruptedException,
> JMSException {
> List<KafkaConsumer<String, String>> kafkaConsumers = new
> ArrayList<>();
> for(int i = 0 ; i < 10 ; i++){
> Properties props =
> getProperties("34620a79-f68b-4129-88f1-0e96bb14194e", "*.*.*.*:9092");
> KafkaConsumer<String, String> consumer = new
> KafkaConsumer<>(props);
> consumer.subscribe(Collections.singletonList("test"));
> kafkaConsumers.add(consumer);
> }
> ExecutorService executorService =
> Executors.newFixedThreadPool(3);
> for (int i = 0; i < 30; i++) {
> final KafkaConsumer<String, String> consumer =
> kafkaConsumers.get(i % kafkaConsumers.size());
> executorService.execute(new Runnable() {
> @Override
> public void run() {
>
> System.out.println(Thread.currentThread().getName() + " start get message ,
> consumer: " + consumer.toString());
>
> System.out.println(consumer.assignment());
> ConsumerRecords<String, String> records
> = consumer.poll(200);
>
> System.out.println(Thread.currentThread().getName() + " poll end, consumer:
> " + consumer.toString());
> for (ConsumerRecord<String, String>
> record : records) {
> System.out.printf("offset = %d,
> pattition = %d, key = %s, value = %s", record.offset(),record.partition(),
> record.key(), record.value());
>
> System.out.println(Thread.currentThread().getName());
> consumer.commitSync();
> try {
> Thread.sleep(2000);
> }
> catch (InterruptedException e) {
> e.printStackTrace();
> }
> }
> }
> });
> }
> }
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)