I am trying to create a very simple (new) consumer in Java using the
trunk:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "group1");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new
KafkaConsumer<String, String>(props);
consumer.subscribe(Collections.singletonList("t1"));
try {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(100);
for (ConsumerRecord<String, String> record
: records)
{
System.out.println(String.format(
"topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
record.topic(),
record.partition(), record.offset(), record.key(), record.value()));
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
This code raises the following exception when polling:
org.apache.kafka.common.protocol.types.SchemaException: Error reading
field 'topic_metadata': Error reading array of size 160817, only 30 bytes
available
at org.apache.kafka.common.protocol.types.Schema.read(
Schema.java:73)
at org.apache.kafka.clients.NetworkClient.parseResponse(
NetworkClient.java:380)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(
NetworkClient.java:449)
at org.apache.kafka.clients.NetworkClient.poll(
NetworkClient.java:269)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(
ConsumerNetworkClient.java:360)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
ConsumerNetworkClient.java:224)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
ConsumerNetworkClient.java:192)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
ConsumerNetworkClient.java:163)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(
AbstractCoordinator.java:179)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
KafkaConsumer.java:973)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
KafkaConsumer.java:937)
at NewConsumerLoop.main(NewConsumerLoop.java:97)
Can anyone spot what the issue is? Thanks.
Regards,
--Vahid Hashemian