[ https://issues.apache.org/jira/browse/KAFKA-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wang Hong updated KAFKA-4832: ----------------------------- Description: 1.When I tried to send msg by Async with wrong IP in loop 1400times 10batches. 2.I use javaapi.kafkaproducer designed by Factory. 3.1 of 10 batches I take Producer.Connected() and Producer.Closed(). 4.I know I send msg to a wrong IP finally, But I noticed the terminal was blocking. It can't close normally. function just like that : public static void go(int s) throws Exception { KafkaService kf = new KafkaServiceImpl();//init properties for (int i = 0; i < 1400; i++) { String msg = "a b 0a b c d e 0a b c d e 0" + s + "--" + i; System.out.println(msg); kf.push(msg); //producer.send() } kf.closeProducerFactory();//producer.closed() System.out.println(s); Thread.sleep(1000); } kf.closeProducerFactory() is used by producer.closed(), But Async send was always waiting for kafka server .I gave it a wrong IP. I think it waits for long time Will bring problem with whole system.it occupy resources. And another problem was I sending kafka msg with true IP and Runnable ,Threadpools, all is right .Also use ↑ examples for loop. It take error that said wait for 3 tries. I also configered advertised.host.name=xxx.xxx.xxx.xxx advertised.port=9092 Now I think it maybe cannot get so much concurrent volume in a time. Our System is over 1000tps. Thank you . Resource Code part: package kafka.baseconfig; import java.util.Properties; import com.travelsky.util.ConFile; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * kafka工厂模式 * * 1.替代Producer方法.//多线程效率不适合. * 2.使用三部: * ProducerFactory fac = new ProducerFactory(); * fac.openProducer(); ->初始化对象 * fac.push(msg); ->发消息主体 * fac.closeProducer(); ->关闭对象 * @author 王宏 * */ public class ProducerFactory { protected Producer<String, String> producer = null; protected ConFile conf = null; private Properties props = new Properties(); private String topic = null; { try { conf = new ConFile("KafkaProperties.conf"); topic = conf.getString("topic"); if (conf == null) { throw new Exception("kafka配置文件有问题"); } } catch (Exception e) { e.printStackTrace(); } } /** * 发送消息方法 * @param msg */ public void push(String msg) { if (producer == null) { throw new RuntimeException("producer实例为空"); } KeyedMessage<String, String> messageForSend = new KeyedMessage<String, String>(topic, msg); producer.send(messageForSend); } /** * 打开生产者 */ public void openProducer() { props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list", conf.getString("kafkaserverurl")); // 异步发送 props.put("producer.type", conf.getString("synctype")); // 每次发送多少条 props.put("batch.num.messages", conf.getString("batchmsgnums")); // props.put("request.required.acks", "1"); // props.put("queue.enqueue.timeout.ms", "1"); // props.put("request.timeout.ms", "1"); // props.put("timeout.ms", "1"); // props.put("reconnect.backoff.ms", "1"); // props.put("retry.backoff.ms", "1"); // props.put("message.send.max.retries", "1"); // props.put("retry.backoff.ms", "1"); // props.put("linger.ms", "1"); // props.put("max.block.ms", "1"); // props.put("metadata.fetch.timeout.ms", "1"); // props.put("metadata.max.age.ms", "1"); // props.put("metrics.sample.window.ms ", "1"); producer = new Producer<String, String>(new ProducerConfig(props)); if (producer == null) { throw new RuntimeException("kafka producer 打开失败"); } } /** * 关闭生产对象 */ public void closeProducer() { if (producer != null) { producer.close(); } } /** * 判断producer是否开启 * @return */ public boolean isOpenProduer() { return producer != null; } } package kafka.service.impl; import kafka.baseconfig.ProducerFactory; import kafka.service.KafkaService; public class KafkaServiceImpl implements KafkaService { private ProducerFactory factory = null; public KafkaServiceImpl() { factory = new ProducerFactory(); factory.openProducer(); } /** * 往卡呼卡灌装数据并且可以修改topic * @param msg 数据 * @param topic 发送的主题 * * @Deprecated 這個方法已經過期.不建議使用. */ @Override @Deprecated public void push(String msg) throws Exception { //new Producer(msg).start(); if (factory.isOpenProduer()) { factory.push(msg); }else { throw new RuntimeException("factory沒有初始化"); } } /** * 过期方法 * * @param msg * @param topic * @throws Exception */ @Override public void push(String msg, String topic) throws Exception { //new Producer(msg, topic).start(); if (factory.isOpenProduer()) { factory.push(msg); }else { throw new RuntimeException("factory沒有初始化"); } } /** * 释放资源. */ @Override public void closeProducerFactory()throws Exception{ if (factory.isOpenProduer()) { factory.closeProducer(); } } } public static void main(String[] args) throws Exception { long l = System.currentTimeMillis(); for (int i = 0; i < 10; i++) { go(i); } System.out.println(System.currentTimeMillis() - l); } public static void go(int s) throws Exception { for (int i = 0; i < 1400; i++) { KafkaService kf = new KafkaServiceImpl(); String msg = "a b 0a b c d e 0a b c d e 0" + s + "--" + i; System.out.println(msg); kf.push(msg); kf.closeProducerFactory(); } System.out.println(s); Thread.sleep(1000); } was: 1.When I tried to send msg by Async with wrong IP in loop 1400times 10batches. 2.I use javaapi.kafkaproducer designed by Factory. 3.1 of 10 batches I take Producer.Connected() and Producer.Closed(). 4.I know I send msg to a wrong IP finally, But I noticed the terminal was blocking. It can't close normally. function just like that : public static void go(int s) throws Exception { KafkaService kf = new KafkaServiceImpl();//init properties for (int i = 0; i < 1400; i++) { String msg = "a b 0a b c d e 0a b c d e 0" + s + "--" + i; System.out.println(msg); kf.push(msg); //producer.send() } kf.closeProducerFactory();//producer.closed() System.out.println(s); Thread.sleep(1000); } kf.closeProducerFactory() is used by producer.closed(), But Async send was always waiting for kafka server .I gave it a wrong IP. I think it waits for long time Will bring problem with whole system.it occupy resources. And another problem was I sending kafka msg with true IP and Runnable ,Threadpools, all is right . > kafka producer send Async message to the wrong IP cannot to stop > producer.close() > --------------------------------------------------------------------------------- > > Key: KAFKA-4832 > URL: https://issues.apache.org/jira/browse/KAFKA-4832 > Project: Kafka > Issue Type: Bug > Components: producer > Affects Versions: 0.8.2.2 > Environment: JDK8 Eclipse Mars Win7 > Reporter: Wang Hong > Fix For: 0.8.2.2 > > > 1.When I tried to send msg by Async with wrong IP in loop 1400times 10batches. > 2.I use javaapi.kafkaproducer designed by Factory. > 3.1 of 10 batches I take Producer.Connected() and Producer.Closed(). > 4.I know I send msg to a wrong IP finally, But I noticed the terminal was > blocking. It can't close normally. > function just like that : > public static void go(int s) throws Exception { > KafkaService kf = new KafkaServiceImpl();//init properties > for (int i = 0; i < 1400; i++) { > String msg = "a b 0a b c d e 0a b c d e 0" + s + "--" + > i; > System.out.println(msg); > kf.push(msg); //producer.send() > } > kf.closeProducerFactory();//producer.closed() > System.out.println(s); > Thread.sleep(1000); > } > kf.closeProducerFactory() is used by producer.closed(), > But Async send was always waiting for kafka server .I gave it a wrong IP. > I think it waits for long time Will bring problem with whole system.it occupy > resources. > And another problem was I sending kafka msg with true IP and Runnable > ,Threadpools, all is right .Also use ↑ examples for loop. > It take error that said wait for 3 tries. > I also configered > advertised.host.name=xxx.xxx.xxx.xxx > advertised.port=9092 > Now I think it maybe cannot get so much concurrent volume in a time. > Our System is over 1000tps. > Thank you . > Resource Code part: > package kafka.baseconfig; > import java.util.Properties; > import com.travelsky.util.ConFile; > import kafka.javaapi.producer.Producer; > import kafka.producer.KeyedMessage; > import kafka.producer.ProducerConfig; > /** > * kafka工厂模式 > * > * 1.替代Producer方法.//多线程效率不适合. > * 2.使用三部: > * ProducerFactory fac = new ProducerFactory(); > * fac.openProducer(); ->初始化对象 > * fac.push(msg); ->发消息主体 > * fac.closeProducer(); ->关闭对象 > * @author 王宏 > * > */ > public class ProducerFactory { > protected Producer<String, String> producer = null; > protected ConFile conf = null; > private Properties props = new Properties(); > private String topic = null; > { > try { > conf = new ConFile("KafkaProperties.conf"); > topic = conf.getString("topic"); > if (conf == null) { > throw new Exception("kafka配置文件有问题"); > } > } catch (Exception e) { > e.printStackTrace(); > } > } > > /** > * 发送消息方法 > * @param msg > */ > public void push(String msg) { > if (producer == null) { > throw new RuntimeException("producer实例为空"); > } > KeyedMessage<String, String> messageForSend = new > KeyedMessage<String, String>(topic, msg); > producer.send(messageForSend); > } > > /** > * 打开生产者 > */ > public void openProducer() { > props.put("serializer.class", "kafka.serializer.StringEncoder"); > props.put("metadata.broker.list", > conf.getString("kafkaserverurl")); > // 异步发送 > props.put("producer.type", conf.getString("synctype")); > // 每次发送多少条 > props.put("batch.num.messages", conf.getString("batchmsgnums")); > > // > props.put("request.required.acks", "1"); > // > props.put("queue.enqueue.timeout.ms", "1"); > // > props.put("request.timeout.ms", "1"); > // > props.put("timeout.ms", "1"); > // > props.put("reconnect.backoff.ms", "1"); > // > props.put("retry.backoff.ms", "1"); > // > props.put("message.send.max.retries", "1"); > // > props.put("retry.backoff.ms", "1"); > // > props.put("linger.ms", "1"); > // > props.put("max.block.ms", "1"); > // > props.put("metadata.fetch.timeout.ms", "1"); > // > props.put("metadata.max.age.ms", "1"); > // > props.put("metrics.sample.window.ms ", "1"); > producer = new Producer<String, String>(new > ProducerConfig(props)); > if (producer == null) { > throw new RuntimeException("kafka producer 打开失败"); > } > } > /** > * 关闭生产对象 > */ > public void closeProducer() { > if (producer != null) { > producer.close(); > } > } > /** > * 判断producer是否开启 > * @return > */ > public boolean isOpenProduer() { > return producer != null; > } > } > package kafka.service.impl; > import kafka.baseconfig.ProducerFactory; > import kafka.service.KafkaService; > public class KafkaServiceImpl implements KafkaService { > private ProducerFactory factory = null; > > public KafkaServiceImpl() { > factory = new ProducerFactory(); > factory.openProducer(); > } > > /** > * 往卡呼卡灌装数据并且可以修改topic > * @param msg 数据 > * @param topic 发送的主题 > * > * @Deprecated 這個方法已經過期.不建議使用. > */ > @Override > @Deprecated > public void push(String msg) throws Exception { > //new Producer(msg).start(); > if (factory.isOpenProduer()) { > factory.push(msg); > }else { > throw new RuntimeException("factory沒有初始化"); > } > } > > /** > * 过期方法 > * > * @param msg > * @param topic > * @throws Exception > */ > @Override > public void push(String msg, String topic) throws Exception { > //new Producer(msg, topic).start(); > if (factory.isOpenProduer()) { > factory.push(msg); > }else { > throw new RuntimeException("factory沒有初始化"); > } > } > > /** > * 释放资源. > */ > @Override > public void closeProducerFactory()throws Exception{ > if (factory.isOpenProduer()) { > factory.closeProducer(); > } > } > } > public static void main(String[] args) throws Exception { > long l = System.currentTimeMillis(); > for (int i = 0; i < 10; i++) { > go(i); > } > System.out.println(System.currentTimeMillis() - l); > } > public static void go(int s) throws Exception { > for (int i = 0; i < 1400; i++) { > KafkaService kf = new KafkaServiceImpl(); > String msg = "a b 0a b c d e 0a b c d e 0" + s + "--" + > i; > System.out.println(msg); > kf.push(msg); > kf.closeProducerFactory(); > } > System.out.println(s); > Thread.sleep(1000); > } -- This message was sent by Atlassian JIRA (v6.3.15#6346)