[ 
https://issues.apache.org/jira/browse/KAFKA-5760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julius updated KAFKA-5760:
--------------------------
    Description: 
When I poll the message, I set the timeout, but it has been blocked in the poll 
method,My kafka Cluster is normal, the test topic has 30 partition.
Through debugg found, has been doing ConsumerNetworkClient.poll method:
    /**
     * Block indefinitely until the given request future has finished.
     * @param future The request future to await.
     * @throws WakeupException if {@link #wakeup()} is called from another 
thread
     * @throws InterruptException if the calling thread is interrupted
     */
    public void poll(RequestFuture<?> future) {
        while (!future.isDone())
            poll(MAX_POLL_TIMEOUT_MS, time.milliseconds(), future);
    }
I don't know reason, trouble you to confirm, thank you。

Java Code:
        public static Properties getProperties(String groupId, String 
brokerList) {
                Properties properties = new Properties();
                properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
                properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerList);
                properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
                properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
OffsetResetStrategy.EARLIEST.name().toLowerCase());
                properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
org.apache.kafka.common.serialization.StringDeserializer.class.getName());
                properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
org.apache.kafka.common.serialization.StringDeserializer.class.getName());
                properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
String.valueOf(false));
                return properties;
        }

public static void main(String[] args) throws InterruptedException, 
JMSException {
                List<KafkaConsumer<String, String>> kafkaConsumers = new 
ArrayList<>();
                for(int i = 0 ; i < 10 ; i++){
                        Properties props = 
getProperties("34620a79-f68b-4129-88f1-0e96bb14194e", "*.*.*.*:9092");
                        KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(props);
                        consumer.subscribe(Collections.singletonList("test"));
                        kafkaConsumers.add(consumer);
                }
                ExecutorService executorService = 
Executors.newFixedThreadPool(3);
                for (int i = 0; i < 30; i++) {
                        final KafkaConsumer<String, String> consumer = 
kafkaConsumers.get(i % kafkaConsumers.size());
                        executorService.execute(new Runnable() {
                                @Override
                                public void run() {
                                        
System.out.println(Thread.currentThread().getName() + "  start get message , 
consumer: " + consumer.toString());
                                        
System.out.println(consumer.assignment());
                                        ConsumerRecords<String, String> records 
= consumer.poll(200);
                                        
System.out.println(Thread.currentThread().getName() + "  poll end, consumer: " 
+ consumer.toString());
                                        for (ConsumerRecord<String, String> 
record : records) {
                                                System.out.printf("offset = %d, 
pattition = %d, key = %s, value = %s", record.offset(),record.partition(), 
record.key(), record.value());
                                                
System.out.println(Thread.currentThread().getName());
                                                consumer.commitSync();
                                                try {
                                                        Thread.sleep(2000);
                                                }
                                                catch (InterruptedException e) {
                                                        e.printStackTrace();
                                                }
                                        }
                                }
                        });
                }
        }

  was:
When I poll the message, I set the timeout, but it has been blocked in the poll 
method,My kafka Cluster is normal.
Through debugg found, has been doing ConsumerNetworkClient.poll method:
    /**
     * Block indefinitely until the given request future has finished.
     * @param future The request future to await.
     * @throws WakeupException if {@link #wakeup()} is called from another 
thread
     * @throws InterruptException if the calling thread is interrupted
     */
    public void poll(RequestFuture<?> future) {
        while (!future.isDone())
            poll(MAX_POLL_TIMEOUT_MS, time.milliseconds(), future);
    }
I don't know reason, trouble you to confirm, thank you。

Java Code:
        public static Properties getProperties(String groupId, String 
brokerList) {
                Properties properties = new Properties();
                properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
                properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerList);
                properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
                properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
OffsetResetStrategy.EARLIEST.name().toLowerCase());
                properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
org.apache.kafka.common.serialization.StringDeserializer.class.getName());
                properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
org.apache.kafka.common.serialization.StringDeserializer.class.getName());
                properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
String.valueOf(false));
                return properties;
        }

public static void main(String[] args) throws InterruptedException, 
JMSException {
                List<KafkaConsumer<String, String>> kafkaConsumers = new 
ArrayList<>();
                for(int i = 0 ; i < 10 ; i++){
                        Properties props = 
getProperties("34620a79-f68b-4129-88f1-0e96bb14194e", "*.*.*.*:9092");
                        KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(props);
                        consumer.subscribe(Collections.singletonList("test"));
                        kafkaConsumers.add(consumer);
                }
                ExecutorService executorService = 
Executors.newFixedThreadPool(3);
                for (int i = 0; i < 30; i++) {
                        final KafkaConsumer<String, String> consumer = 
kafkaConsumers.get(i % kafkaConsumers.size());
                        executorService.execute(new Runnable() {
                                @Override
                                public void run() {
                                        
System.out.println(Thread.currentThread().getName() + "  start get message , 
consumer: " + consumer.toString());
                                        
System.out.println(consumer.assignment());
                                        ConsumerRecords<String, String> records 
= consumer.poll(200);
                                        
System.out.println(Thread.currentThread().getName() + "  poll end, consumer: " 
+ consumer.toString());
                                        for (ConsumerRecord<String, String> 
record : records) {
                                                System.out.printf("offset = %d, 
pattition = %d, key = %s, value = %s", record.offset(),record.partition(), 
record.key(), record.value());
                                                
System.out.println(Thread.currentThread().getName());
                                                consumer.commitSync();
                                                try {
                                                        Thread.sleep(2000);
                                                }
                                                catch (InterruptedException e) {
                                                        e.printStackTrace();
                                                }
                                        }
                                }
                        });
                }
        }


> The kafka Consumer can not poll the message by multi-thread
> -----------------------------------------------------------
>
>                 Key: KAFKA-5760
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5760
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.11.0.0
>            Reporter: Julius
>
> When I poll the message, I set the timeout, but it has been blocked in the 
> poll method,My kafka Cluster is normal, the test topic has 30 partition.
> Through debugg found, has been doing ConsumerNetworkClient.poll method:
>     /**
>      * Block indefinitely until the given request future has finished.
>      * @param future The request future to await.
>      * @throws WakeupException if {@link #wakeup()} is called from another 
> thread
>      * @throws InterruptException if the calling thread is interrupted
>      */
>     public void poll(RequestFuture<?> future) {
>         while (!future.isDone())
>             poll(MAX_POLL_TIMEOUT_MS, time.milliseconds(), future);
>     }
> I don't know reason, trouble you to confirm, thank you。
> Java Code:
>       public static Properties getProperties(String groupId, String 
> brokerList) {
>               Properties properties = new Properties();
>               properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
>               properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> brokerList);
>               properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
>               properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
> OffsetResetStrategy.EARLIEST.name().toLowerCase());
>               properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> org.apache.kafka.common.serialization.StringDeserializer.class.getName());
>               properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> org.apache.kafka.common.serialization.StringDeserializer.class.getName());
>               properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
> String.valueOf(false));
>               return properties;
>       }
> public static void main(String[] args) throws InterruptedException, 
> JMSException {
>               List<KafkaConsumer<String, String>> kafkaConsumers = new 
> ArrayList<>();
>               for(int i = 0 ; i < 10 ; i++){
>                       Properties props = 
> getProperties("34620a79-f68b-4129-88f1-0e96bb14194e", "*.*.*.*:9092");
>                       KafkaConsumer<String, String> consumer = new 
> KafkaConsumer<>(props);
>                       consumer.subscribe(Collections.singletonList("test"));
>                       kafkaConsumers.add(consumer);
>               }
>               ExecutorService executorService = 
> Executors.newFixedThreadPool(3);
>               for (int i = 0; i < 30; i++) {
>                       final KafkaConsumer<String, String> consumer = 
> kafkaConsumers.get(i % kafkaConsumers.size());
>                       executorService.execute(new Runnable() {
>                               @Override
>                               public void run() {
>                                       
> System.out.println(Thread.currentThread().getName() + "  start get message , 
> consumer: " + consumer.toString());
>                                       
> System.out.println(consumer.assignment());
>                                       ConsumerRecords<String, String> records 
> = consumer.poll(200);
>                                       
> System.out.println(Thread.currentThread().getName() + "  poll end, consumer: 
> " + consumer.toString());
>                                       for (ConsumerRecord<String, String> 
> record : records) {
>                                               System.out.printf("offset = %d, 
> pattition = %d, key = %s, value = %s", record.offset(),record.partition(), 
> record.key(), record.value());
>                                               
> System.out.println(Thread.currentThread().getName());
>                                               consumer.commitSync();
>                                               try {
>                                                       Thread.sleep(2000);
>                                               }
>                                               catch (InterruptedException e) {
>                                                       e.printStackTrace();
>                                               }
>                                       }
>                               }
>                       });
>               }
>       }



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to