[
https://issues.apache.org/jira/browse/KAFKA-1632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
aarti gupta updated KAFKA-1632:
-------------------------------
Description:
\The following error is thrown, (when I call KafkaStream.head(), as shown in
the code snippet below)
WARN - java.lang.NoSuchMethodError:
kafka.consumer.KafkaStream.head()Lkafka/message/MessageAndMetadata;
My use case, is that I want to block on the receive() method, and when a
message is published on the topic, I 'head' of the queue to the calling method,
that processes it.
I do not use partitioning and have a single stream.
import com.google.common.collect.Maps;
import x.x.x.Task;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.consumer.ZookeeperConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
/**
* @author agupta
*/
public class KafkaConsumerDelegate implements ConsumerDelegate {
private ConsumerConnector consumerConnector;
private String topicName;
private static Logger LOG =
LoggerFactory.getLogger(KafkaProducerDelegate.class.getName());
private final Map<String, Integer> topicCount = Maps.newHashMap();
private Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams;
private List<KafkaStream<byte[], byte[]>> kafkaStreams;
@Override
public Task receive(final boolean consumerConfirms) {
try {
LOG.info("Kafka consumer delegate listening on topic " +
getTopicName());
kafkaStreams = messageStreams.get(getTopicName());
final KafkaStream<byte[], byte[]> kafkaStream = kafkaStreams.get(0);
return Executors.newSingleThreadExecutor().submit(new
Callable<Task>() {
@Override
public Task call() throws Exception {
final MessageAndMetadata<byte[], byte[]>
messageAndMetadata= kafkaStream.head();
final Task message = new Task() {
@Override
public byte[] getBytes() {
return messageAndMetadata.message();
}
};
return message;
}
}).get();
} catch (Exception e) {
LOG.warn("Error in consumer " + e.getMessage());
}
return null;
}
@Override
public void initialize(JSONObject configData, boolean publisherAckMode)
throws IOException {
try {
this.topicName = configData.getString("topicName");
LOG.info("Topic name is " + topicName);
} catch (JSONException e) {
e.printStackTrace();
LOG.error("Error parsing configuration", e);
}
Properties properties = new Properties();
properties.put("zookeeper.connect", "localhost:2181");
properties.put("group.id", "testgroup");
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
//only one stream, and one topic, (Since we are not supporting
partitioning)
topicCount.put(getTopicName(), 1);
consumerConnector = new ZookeeperConsumerConnector(consumerConfig);
messageStreams = consumerConnector.createMessageStreams(topicCount);
}
@Override
public void stop() throws IOException {
//TODO
throw new UnsupportedOperationException("Method Not Implemented");
}
public String getTopicName() {
return this.topicName;
}
}
Lastly, I am using the following binary
kafka_2.8.0-0.8.1.1
and the following maven dependency
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
</dependency>
Any suggestions?
Thanks
aarti
was:
\The following error is thrown, (when I call KafkaStream.head(), as shown in
the code snippet below)
WARN - java.lang.NoSuchMethodError:
kafka.consumer.KafkaStream.head()Lkafka/message/MessageAndMetadata;
My use case, is that I want to block on the receive() method, and when anything
is published on the topic, I 'head' of the queue to the calling method, that
processes it.
I do not use partitioning and have a single stream.
import com.google.common.collect.Maps;
import x.x.x.Task;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.consumer.ZookeeperConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
/**
* @author agupta
*/
public class KafkaConsumerDelegate implements ConsumerDelegate {
private ConsumerConnector consumerConnector;
private String topicName;
private static Logger LOG =
LoggerFactory.getLogger(KafkaProducerDelegate.class.getName());
private final Map<String, Integer> topicCount = Maps.newHashMap();
private Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams;
private List<KafkaStream<byte[], byte[]>> kafkaStreams;
@Override
public Task receive(final boolean consumerConfirms) {
try {
LOG.info("Kafka consumer delegate listening on topic " +
getTopicName());
kafkaStreams = messageStreams.get(getTopicName());
final KafkaStream<byte[], byte[]> kafkaStream = kafkaStreams.get(0);
return Executors.newSingleThreadExecutor().submit(new
Callable<Task>() {
@Override
public Task call() throws Exception {
final MessageAndMetadata<byte[], byte[]>
messageAndMetadata= kafkaStream.head();
final Task message = new Task() {
@Override
public byte[] getBytes() {
return messageAndMetadata.message();
}
};
return message;
}
}).get();
} catch (Exception e) {
LOG.warn("Error in consumer " + e.getMessage());
}
return null;
}
@Override
public void initialize(JSONObject configData, boolean publisherAckMode)
throws IOException {
try {
this.topicName = configData.getString("topicName");
LOG.info("Topic name is " + topicName);
} catch (JSONException e) {
e.printStackTrace();
LOG.error("Error parsing configuration", e);
}
Properties properties = new Properties();
properties.put("zookeeper.connect", "localhost:2181");
properties.put("group.id", "testgroup");
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
//only one stream, and one topic, (Since we are not supporting
partitioning)
topicCount.put(getTopicName(), 1);
consumerConnector = new ZookeeperConsumerConnector(consumerConfig);
messageStreams = consumerConnector.createMessageStreams(topicCount);
}
@Override
public void stop() throws IOException {
//TODO
throw new UnsupportedOperationException("Method Not Implemented");
}
public String getTopicName() {
return this.topicName;
}
}
Lastly, I am using the following binary
kafka_2.8.0-0.8.1.1
and the following maven dependency
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
</dependency>
Any suggestions?
Thanks
aarti
> No such method error on KafkaStream.head
> ----------------------------------------
>
> Key: KAFKA-1632
> URL: https://issues.apache.org/jira/browse/KAFKA-1632
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.8.0
> Reporter: aarti gupta
>
> \The following error is thrown, (when I call KafkaStream.head(), as shown in
> the code snippet below)
> WARN - java.lang.NoSuchMethodError:
> kafka.consumer.KafkaStream.head()Lkafka/message/MessageAndMetadata;
> My use case, is that I want to block on the receive() method, and when a
> message is published on the topic, I 'head' of the queue to the calling
> method, that processes it.
> I do not use partitioning and have a single stream.
> import com.google.common.collect.Maps;
> import x.x.x.Task;
> import kafka.consumer.ConsumerConfig;
> import kafka.consumer.KafkaStream;
> import kafka.javaapi.consumer.ConsumerConnector;
> import kafka.javaapi.consumer.ZookeeperConsumerConnector;
> import kafka.message.MessageAndMetadata;
> import org.codehaus.jettison.json.JSONException;
> import org.codehaus.jettison.json.JSONObject;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import java.io.IOException;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
> import java.util.concurrent.Callable;
> import java.util.concurrent.Executors;
> /**
> * @author agupta
> */
> public class KafkaConsumerDelegate implements ConsumerDelegate {
> private ConsumerConnector consumerConnector;
> private String topicName;
> private static Logger LOG =
> LoggerFactory.getLogger(KafkaProducerDelegate.class.getName());
> private final Map<String, Integer> topicCount = Maps.newHashMap();
> private Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams;
> private List<KafkaStream<byte[], byte[]>> kafkaStreams;
> @Override
> public Task receive(final boolean consumerConfirms) {
> try {
> LOG.info("Kafka consumer delegate listening on topic " +
> getTopicName());
> kafkaStreams = messageStreams.get(getTopicName());
> final KafkaStream<byte[], byte[]> kafkaStream =
> kafkaStreams.get(0);
> return Executors.newSingleThreadExecutor().submit(new
> Callable<Task>() {
> @Override
> public Task call() throws Exception {
> final MessageAndMetadata<byte[], byte[]>
> messageAndMetadata= kafkaStream.head();
> final Task message = new Task() {
> @Override
> public byte[] getBytes() {
> return messageAndMetadata.message();
> }
> };
> return message;
> }
> }).get();
> } catch (Exception e) {
> LOG.warn("Error in consumer " + e.getMessage());
> }
> return null;
> }
> @Override
> public void initialize(JSONObject configData, boolean publisherAckMode)
> throws IOException {
> try {
> this.topicName = configData.getString("topicName");
> LOG.info("Topic name is " + topicName);
> } catch (JSONException e) {
> e.printStackTrace();
> LOG.error("Error parsing configuration", e);
> }
> Properties properties = new Properties();
> properties.put("zookeeper.connect", "localhost:2181");
> properties.put("group.id", "testgroup");
> ConsumerConfig consumerConfig = new ConsumerConfig(properties);
> //only one stream, and one topic, (Since we are not supporting
> partitioning)
> topicCount.put(getTopicName(), 1);
> consumerConnector = new ZookeeperConsumerConnector(consumerConfig);
> messageStreams = consumerConnector.createMessageStreams(topicCount);
> }
> @Override
> public void stop() throws IOException {
> //TODO
> throw new UnsupportedOperationException("Method Not Implemented");
> }
> public String getTopicName() {
> return this.topicName;
> }
> }
> Lastly, I am using the following binary
> kafka_2.8.0-0.8.1.1
> and the following maven dependency
> <dependency>
> <groupId>org.apache.kafka</groupId>
> <artifactId>kafka_2.10</artifactId>
> <version>0.8.1.1</version>
> </dependency>
> Any suggestions?
> Thanks
> aarti
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)