[ 
https://issues.apache.org/jira/browse/KAFKA-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wang Hong updated KAFKA-4832:
-----------------------------
    Description: 
1.When I tried to send msg by Async with wrong IP in loop 1400times 10batches.
2.I use javaapi.kafkaproducer designed by Factory.
3.1 of 10 batches I take Producer.Connected() and Producer.Closed().
4.I know I send msg to a wrong IP finally, But I noticed the terminal was 
blocking. It can't close normally.
function just like that :
        public static void go(int s) throws Exception {
                KafkaService kf = new KafkaServiceImpl();//init properties
                for (int i = 0; i < 1400; i++) {
                        String msg = "a b 0a b c d e 0a b c d e 0" + s + "--" + 
i;
                        System.out.println(msg);
                        kf.push(msg); //producer.send()
                }
                kf.closeProducerFactory();//producer.closed()
                System.out.println(s);
                Thread.sleep(1000);
        }
kf.closeProducerFactory() is used by producer.closed(),
But Async send was always waiting for kafka server .I gave it a wrong IP.
I think it waits for long time Will bring problem with whole system.it occupy 
resources.


And another problem was I sending kafka msg with true IP and Runnable 
,Threadpools, all is right .Also use ↑ examples for loop.
It take error that said wait for 3 tries.
I also configered 
advertised.host.name=xxx.xxx.xxx.xxx
advertised.port=9092

Now I think it maybe cannot get so much concurrent volume in a time.
Our System is  over 1000tps.

Thank you .


Resource Code part:

package kafka.baseconfig;

import java.util.Properties;

import com.travelsky.util.ConFile;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * kafka工厂模式
 * 
 * 1.替代Producer方法.//多线程效率不适合.
 * 2.使用三部: 
 * ProducerFactory fac = new ProducerFactory();
 * fac.openProducer(); ->初始化对象
 * fac.push(msg); ->发消息主体
 * fac.closeProducer(); ->关闭对象
 * @author 王宏
 *
 */
public class ProducerFactory {
        protected Producer<String, String> producer = null;
        protected ConFile conf = null;
        private Properties props = new Properties();
        private String topic = null;

        {
                try {
                        conf = new ConFile("KafkaProperties.conf");
                        topic = conf.getString("topic");
                        if (conf == null) {
                                throw new Exception("kafka配置文件有问题");
                        }
                } catch (Exception e) {
                        e.printStackTrace();
                }
        }
        
        /**
         * 发送消息方法
         * @param msg
         */
        public void push(String msg) {
                if (producer == null) {
                        throw new RuntimeException("producer实例为空");
                }
                KeyedMessage<String, String> messageForSend = new 
KeyedMessage<String, String>(topic, msg);
                producer.send(messageForSend);
        }
        
        /**
         * 打开生产者
         */
        public void openProducer() {
                props.put("serializer.class", "kafka.serializer.StringEncoder");
                props.put("metadata.broker.list", 
conf.getString("kafkaserverurl"));
                // 异步发送
                props.put("producer.type", conf.getString("synctype"));
                // 每次发送多少条
                props.put("batch.num.messages", conf.getString("batchmsgnums"));
                
                //
                props.put("request.required.acks", "1");
                //
                props.put("queue.enqueue.timeout.ms", "1");
                //
                props.put("request.timeout.ms", "1");
                //
                props.put("timeout.ms", "1");
                //
                props.put("reconnect.backoff.ms", "1");
                //
                props.put("retry.backoff.ms", "1");
                //
                props.put("message.send.max.retries", "1");
                //
                props.put("retry.backoff.ms", "1");
                //
                props.put("linger.ms", "1");
                //
                props.put("max.block.ms", "1");
                //
                props.put("metadata.fetch.timeout.ms", "1");
                //
                props.put("metadata.max.age.ms", "1");
                //
                props.put("metrics.sample.window.ms ", "1");
                producer = new Producer<String, String>(new 
ProducerConfig(props));
                if (producer == null) {
                        throw new RuntimeException("kafka producer 打开失败");
                }
        }

        /**
         * 关闭生产对象
         */
        public void closeProducer() {
                if (producer != null) {
                        producer.close();
                }
        }

        /**
         * 判断producer是否开启
         * @return
         */
        public boolean isOpenProduer() {
                return producer != null;
        }

}

package kafka.service.impl;

import kafka.baseconfig.ProducerFactory;
import kafka.service.KafkaService;

public class KafkaServiceImpl implements KafkaService {
        private ProducerFactory factory = null;
        
        public KafkaServiceImpl() {
                factory = new ProducerFactory();
                factory.openProducer();
        }
        
        /**
         * 往卡呼卡灌装数据并且可以修改topic
         * @param msg 数据
         * @param topic 发送的主题
         * 
         * @Deprecated 這個方法已經過期.不建議使用.
         */
        @Override
        @Deprecated
        public void push(String msg) throws Exception {
                //new Producer(msg).start();
                if (factory.isOpenProduer()) {
                        factory.push(msg);
                }else {
                        throw new RuntimeException("factory沒有初始化");
                }
        }
        
        /**
         * 过期方法
         * 
         * @param msg
         * @param topic
         * @throws Exception
         */
        @Override
        public void push(String msg, String topic) throws Exception {
                //new Producer(msg, topic).start();
                if (factory.isOpenProduer()) {
                        factory.push(msg);
                }else {
                        throw new RuntimeException("factory沒有初始化");
                }
        }
        
        /**
         * 释放资源.
         */
        @Override
        public void closeProducerFactory()throws Exception{
                if (factory.isOpenProduer()) {
                        factory.closeProducer();
                }
        }
}

public static void main(String[] args) throws Exception {
                long l = System.currentTimeMillis();
                for (int i = 0; i < 10; i++) {
                        go(i);
                }
                System.out.println(System.currentTimeMillis() - l);
}
        public static void go(int s) throws Exception {
                for (int i = 0; i < 1400; i++) {
                        KafkaService kf = new KafkaServiceImpl();
                        String msg = "a b 0a b c d e 0a b c d e 0" + s + "--" + 
i;
                        System.out.println(msg);
                        kf.push(msg);
                        kf.closeProducerFactory();
                }
                System.out.println(s);
                Thread.sleep(1000);
        }

  was:
1.When I tried to send msg by Async with wrong IP in loop 1400times 10batches.
2.I use javaapi.kafkaproducer designed by Factory.
3.1 of 10 batches I take Producer.Connected() and Producer.Closed().
4.I know I send msg to a wrong IP finally, But I noticed the terminal was 
blocking. It can't close normally.
function just like that :
        public static void go(int s) throws Exception {
                KafkaService kf = new KafkaServiceImpl();//init properties
                for (int i = 0; i < 1400; i++) {
                        String msg = "a b 0a b c d e 0a b c d e 0" + s + "--" + 
i;
                        System.out.println(msg);
                        kf.push(msg); //producer.send()
                }
                kf.closeProducerFactory();//producer.closed()
                System.out.println(s);
                Thread.sleep(1000);
        }
kf.closeProducerFactory() is used by producer.closed(),
But Async send was always waiting for kafka server .I gave it a wrong IP.
I think it waits for long time Will bring problem with whole system.it occupy 
resources.


And another problem was I sending kafka msg with true IP and Runnable 
,Threadpools, all is right .


> kafka producer send Async message to the wrong IP cannot to stop 
> producer.close()
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-4832
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4832
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.8.2.2
>         Environment: JDK8 Eclipse Mars Win7
>            Reporter: Wang Hong
>             Fix For: 0.8.2.2
>
>
> 1.When I tried to send msg by Async with wrong IP in loop 1400times 10batches.
> 2.I use javaapi.kafkaproducer designed by Factory.
> 3.1 of 10 batches I take Producer.Connected() and Producer.Closed().
> 4.I know I send msg to a wrong IP finally, But I noticed the terminal was 
> blocking. It can't close normally.
> function just like that :
>       public static void go(int s) throws Exception {
>                 KafkaService kf = new KafkaServiceImpl();//init properties
>               for (int i = 0; i < 1400; i++) {
>                       String msg = "a b 0a b c d e 0a b c d e 0" + s + "--" + 
> i;
>                       System.out.println(msg);
>                       kf.push(msg); //producer.send()
>               }
>                 kf.closeProducerFactory();//producer.closed()
>               System.out.println(s);
>               Thread.sleep(1000);
>       }
> kf.closeProducerFactory() is used by producer.closed(),
> But Async send was always waiting for kafka server .I gave it a wrong IP.
> I think it waits for long time Will bring problem with whole system.it occupy 
> resources.
> And another problem was I sending kafka msg with true IP and Runnable 
> ,Threadpools, all is right .Also use ↑ examples for loop.
> It take error that said wait for 3 tries.
> I also configered 
> advertised.host.name=xxx.xxx.xxx.xxx
> advertised.port=9092
> Now I think it maybe cannot get so much concurrent volume in a time.
> Our System is  over 1000tps.
> Thank you .
> Resource Code part:
> package kafka.baseconfig;
> import java.util.Properties;
> import com.travelsky.util.ConFile;
> import kafka.javaapi.producer.Producer;
> import kafka.producer.KeyedMessage;
> import kafka.producer.ProducerConfig;
> /**
>  * kafka工厂模式
>  * 
>  * 1.替代Producer方法.//多线程效率不适合.
>  * 2.使用三部: 
>  * ProducerFactory fac = new ProducerFactory();
>  * fac.openProducer(); ->初始化对象
>  * fac.push(msg); ->发消息主体
>  * fac.closeProducer(); ->关闭对象
>  * @author 王宏
>  *
>  */
> public class ProducerFactory {
>       protected Producer<String, String> producer = null;
>       protected ConFile conf = null;
>       private Properties props = new Properties();
>       private String topic = null;
>       {
>               try {
>                       conf = new ConFile("KafkaProperties.conf");
>                       topic = conf.getString("topic");
>                       if (conf == null) {
>                               throw new Exception("kafka配置文件有问题");
>                       }
>               } catch (Exception e) {
>                       e.printStackTrace();
>               }
>       }
>       
>       /**
>        * 发送消息方法
>        * @param msg
>        */
>       public void push(String msg) {
>               if (producer == null) {
>                       throw new RuntimeException("producer实例为空");
>               }
>               KeyedMessage<String, String> messageForSend = new 
> KeyedMessage<String, String>(topic, msg);
>               producer.send(messageForSend);
>       }
>       
>       /**
>        * 打开生产者
>        */
>       public void openProducer() {
>               props.put("serializer.class", "kafka.serializer.StringEncoder");
>               props.put("metadata.broker.list", 
> conf.getString("kafkaserverurl"));
>               // 异步发送
>               props.put("producer.type", conf.getString("synctype"));
>               // 每次发送多少条
>               props.put("batch.num.messages", conf.getString("batchmsgnums"));
>               
>               //
>               props.put("request.required.acks", "1");
>               //
>               props.put("queue.enqueue.timeout.ms", "1");
>               //
>               props.put("request.timeout.ms", "1");
>               //
>               props.put("timeout.ms", "1");
>               //
>               props.put("reconnect.backoff.ms", "1");
>               //
>               props.put("retry.backoff.ms", "1");
>               //
>               props.put("message.send.max.retries", "1");
>               //
>               props.put("retry.backoff.ms", "1");
>               //
>               props.put("linger.ms", "1");
>               //
>               props.put("max.block.ms", "1");
>               //
>               props.put("metadata.fetch.timeout.ms", "1");
>               //
>               props.put("metadata.max.age.ms", "1");
>               //
>               props.put("metrics.sample.window.ms ", "1");
>               producer = new Producer<String, String>(new 
> ProducerConfig(props));
>               if (producer == null) {
>                       throw new RuntimeException("kafka producer 打开失败");
>               }
>       }
>       /**
>        * 关闭生产对象
>        */
>       public void closeProducer() {
>               if (producer != null) {
>                       producer.close();
>               }
>       }
>       /**
>        * 判断producer是否开启
>        * @return
>        */
>       public boolean isOpenProduer() {
>               return producer != null;
>       }
> }
> package kafka.service.impl;
> import kafka.baseconfig.ProducerFactory;
> import kafka.service.KafkaService;
> public class KafkaServiceImpl implements KafkaService {
>       private ProducerFactory factory = null;
>       
>       public KafkaServiceImpl() {
>               factory = new ProducerFactory();
>               factory.openProducer();
>       }
>       
>       /**
>        * 往卡呼卡灌装数据并且可以修改topic
>        * @param msg 数据
>        * @param topic 发送的主题
>        * 
>        * @Deprecated 這個方法已經過期.不建議使用.
>        */
>       @Override
>       @Deprecated
>       public void push(String msg) throws Exception {
>               //new Producer(msg).start();
>               if (factory.isOpenProduer()) {
>                       factory.push(msg);
>               }else {
>                       throw new RuntimeException("factory沒有初始化");
>               }
>       }
>       
>       /**
>        * 过期方法
>        * 
>        * @param msg
>        * @param topic
>        * @throws Exception
>        */
>       @Override
>       public void push(String msg, String topic) throws Exception {
>               //new Producer(msg, topic).start();
>               if (factory.isOpenProduer()) {
>                       factory.push(msg);
>               }else {
>                       throw new RuntimeException("factory沒有初始化");
>               }
>       }
>       
>       /**
>        * 释放资源.
>        */
>       @Override
>       public void closeProducerFactory()throws Exception{
>               if (factory.isOpenProduer()) {
>                       factory.closeProducer();
>               }
>       }
> }
> public static void main(String[] args) throws Exception {
>               long l = System.currentTimeMillis();
>               for (int i = 0; i < 10; i++) {
>                       go(i);
>               }
>               System.out.println(System.currentTimeMillis() - l);
> }
>       public static void go(int s) throws Exception {
>               for (int i = 0; i < 1400; i++) {
>                       KafkaService kf = new KafkaServiceImpl();
>                       String msg = "a b 0a b c d e 0a b c d e 0" + s + "--" + 
> i;
>                       System.out.println(msg);
>                       kf.push(msg);
>                       kf.closeProducerFactory();
>               }
>               System.out.println(s);
>               Thread.sleep(1000);
>       }



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to