[
https://issues.apache.org/jira/browse/KAFKA-1632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
aarti gupta updated KAFKA-1632:
-------------------------------
Description:
\The following error is thrown, (when I call KafkaStream.head(), as shown in
the code snippet below)
WARN - java.lang.NoSuchMethodError:
kafka.consumer.KafkaStream.head()Lkafka/message/MessageAndMetadata;
My use case, is that I want to block on the receive() method, and when anything
is published on the topic, I 'head' of the queue to the calling method, that
processes it.
I do not use partitioning and have a single stream.
import com.google.common.collect.Maps;
import x.x.x.Task;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.consumer.ZookeeperConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
/**
* @author agupta
*/
public class KafkaConsumerDelegate implements ConsumerDelegate {
private ConsumerConnector consumerConnector;
private String topicName;
private static Logger LOG =
LoggerFactory.getLogger(KafkaProducerDelegate.class.getName());
private final Map<String, Integer> topicCount = Maps.newHashMap();
private Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams;
private List<KafkaStream<byte[], byte[]>> kafkaStreams;
@Override
public Task receive(final boolean consumerConfirms) {
try {
LOG.info("Kafka consumer delegate listening on topic " +
getTopicName());
kafkaStreams = messageStreams.get(getTopicName());
final KafkaStream<byte[], byte[]> kafkaStream = kafkaStreams.get(0);
return Executors.newSingleThreadExecutor().submit(new
Callable<Task>() {
@Override
public Task call() throws Exception {
final MessageAndMetadata<byte[], byte[]>
messageAndMetadata= kafkaStream.head();
final Task message = new Task() {
@Override
public byte[] getBytes() {
return messageAndMetadata.message();
}
};
return message;
}
}).get();
} catch (Exception e) {
LOG.warn("Error in consumer " + e.getMessage());
}
return null;
}
@Override
public void initialize(JSONObject configData, boolean publisherAckMode)
throws IOException {
try {
this.topicName = configData.getString("topicName");
LOG.info("Topic name is " + topicName);
} catch (JSONException e) {
e.printStackTrace();
LOG.error("Error parsing configuration", e);
}
Properties properties = new Properties();
properties.put("zookeeper.connect", "localhost:2181");
properties.put("group.id", "testgroup");
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
//only one stream, and one topic, (Since we are not supporting
partitioning)
topicCount.put(getTopicName(), 1);
consumerConnector = new ZookeeperConsumerConnector(consumerConfig);
messageStreams = consumerConnector.createMessageStreams(topicCount);
}
@Override
public void stop() throws IOException {
//TODO
throw new UnsupportedOperationException("Method Not Implemented");
}
public String getTopicName() {
return this.topicName;
}
}
Lastly, I am using the following binary
kafka_2.8.0-0.8.1.1
and the following maven dependency
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
</dependency>
Any suggestions?
Thanks
aarti
was:
Hi Kafka team,
We use Kafka to send messages in an high volume/memory crazy application which
uses Parallel GC. We send messages at the rate of 12500/min in the first few
hours and then the number of messages drop down to 6000/min. Our application
usually runs for a maximum of 24 hours
What we have:
1) When we do not send messages through Kafka Producer 0.8, then our
application never slows down much and our entire process completes within 24
hours
2) When we use Kafka, our machines slow down in sending messages to around
2500/min and as time progresses, the number of messages being sent is even
lesser
3) We suspect that our application spends more time in GC and hence the
problem. The Heap Dump does not contain an leak suspect with Kafka, but this
slowness happens only when Kafka messaging system is used.
Any pointers that could help us resolve this issue will be highly appreciated.
> No such method error on KafkaStream.head
> ----------------------------------------
>
> Key: KAFKA-1632
> URL: https://issues.apache.org/jira/browse/KAFKA-1632
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.8.0
> Reporter: aarti gupta
> Priority: Critical
>
> \The following error is thrown, (when I call KafkaStream.head(), as shown in
> the code snippet below)
> WARN - java.lang.NoSuchMethodError:
> kafka.consumer.KafkaStream.head()Lkafka/message/MessageAndMetadata;
> My use case, is that I want to block on the receive() method, and when
> anything is published on the topic, I 'head' of the queue to the calling
> method, that processes it.
> I do not use partitioning and have a single stream.
> import com.google.common.collect.Maps;
> import x.x.x.Task;
> import kafka.consumer.ConsumerConfig;
> import kafka.consumer.KafkaStream;
> import kafka.javaapi.consumer.ConsumerConnector;
> import kafka.javaapi.consumer.ZookeeperConsumerConnector;
> import kafka.message.MessageAndMetadata;
> import org.codehaus.jettison.json.JSONException;
> import org.codehaus.jettison.json.JSONObject;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import java.io.IOException;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
> import java.util.concurrent.Callable;
> import java.util.concurrent.Executors;
> /**
> * @author agupta
> */
> public class KafkaConsumerDelegate implements ConsumerDelegate {
> private ConsumerConnector consumerConnector;
> private String topicName;
> private static Logger LOG =
> LoggerFactory.getLogger(KafkaProducerDelegate.class.getName());
> private final Map<String, Integer> topicCount = Maps.newHashMap();
> private Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams;
> private List<KafkaStream<byte[], byte[]>> kafkaStreams;
> @Override
> public Task receive(final boolean consumerConfirms) {
> try {
> LOG.info("Kafka consumer delegate listening on topic " +
> getTopicName());
> kafkaStreams = messageStreams.get(getTopicName());
> final KafkaStream<byte[], byte[]> kafkaStream =
> kafkaStreams.get(0);
> return Executors.newSingleThreadExecutor().submit(new
> Callable<Task>() {
> @Override
> public Task call() throws Exception {
> final MessageAndMetadata<byte[], byte[]>
> messageAndMetadata= kafkaStream.head();
> final Task message = new Task() {
> @Override
> public byte[] getBytes() {
> return messageAndMetadata.message();
> }
> };
> return message;
> }
> }).get();
> } catch (Exception e) {
> LOG.warn("Error in consumer " + e.getMessage());
> }
> return null;
> }
> @Override
> public void initialize(JSONObject configData, boolean publisherAckMode)
> throws IOException {
> try {
> this.topicName = configData.getString("topicName");
> LOG.info("Topic name is " + topicName);
> } catch (JSONException e) {
> e.printStackTrace();
> LOG.error("Error parsing configuration", e);
> }
> Properties properties = new Properties();
> properties.put("zookeeper.connect", "localhost:2181");
> properties.put("group.id", "testgroup");
> ConsumerConfig consumerConfig = new ConsumerConfig(properties);
> //only one stream, and one topic, (Since we are not supporting
> partitioning)
> topicCount.put(getTopicName(), 1);
> consumerConnector = new ZookeeperConsumerConnector(consumerConfig);
> messageStreams = consumerConnector.createMessageStreams(topicCount);
> }
> @Override
> public void stop() throws IOException {
> //TODO
> throw new UnsupportedOperationException("Method Not Implemented");
> }
> public String getTopicName() {
> return this.topicName;
> }
> }
> Lastly, I am using the following binary
> kafka_2.8.0-0.8.1.1
> and the following maven dependency
> <dependency>
> <groupId>org.apache.kafka</groupId>
> <artifactId>kafka_2.10</artifactId>
> <version>0.8.1.1</version>
> </dependency>
> Any suggestions?
> Thanks
> aarti
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)