[
https://issues.apache.org/jira/browse/KAFKA-1657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jay Kreps resolved KAFKA-1657.
------------------------------
Resolution: Won't Fix
I think the ultimate solution here is to move to the new consumer which should
fix all this.
> Fetch request using Simple consumer fails due to failed due to Leader not
> local for partition
> ---------------------------------------------------------------------------------------------
>
> Key: KAFKA-1657
> URL: https://issues.apache.org/jira/browse/KAFKA-1657
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.8.1.1
> Reporter: aarti gupta
>
> I have a three node Kafka cluster, running on the same physical machine, (on
> different ports)
> with replication factor = 3, and a single topic with 3 partitions.
> Multiple producers write to the topic, and a custom partitioner is used to
> direct messages to a given partition.
> I use the simple consumer to read from a given partition of the topic, and
> have three instances of my consumer running
> The code snippet for the simple consumer suggests, that having any node in
> the cluster, (not necessarily the leader for that partition) is sufficient to
> find the leader for the partition, however, on running this, I find, that
> given a different node in the cluster, a null pointer exception is thrown,
> and the logs show the error
> [2014-09-28 20:40:20,984] WARN [KafkaApi-1] Fetch request with correlation id
> 0 from client testClient on partition [VCCTask,1] failed due to Leader not
> local for partition [VCCTask,1] on broker 1 (kafka.server.KafkaApis)
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic VCCTask
> Topic:VCCTask PartitionCount:3 ReplicationFactor:3 Configs:
> Topic: VCCTask Partition: 0 Leader: 1 Replicas: 2,3,1 Isr:
> 1,2,3
> Topic: VCCTask Partition: 1 Leader: 1 Replicas: 3,1,2 Isr:
> 1,2,3
> Topic: VCCTask Partition: 2 Leader: 1 Replicas: 1,2,3 Isr:
> 1,2,3
> If i specify the leader for the partition, instead of any node in the
> cluster, everything works great, but this is an operational nightmare.
> I was able to reproduce this using a simple test, where a producer writes
> numbers from 1 to 999999, and the consumers, consume from a specific
> partition.
> Here are the code snippets
> public class TestConsumerStoreOffsetZookeeper {
> public static void main(String[] args) throws JSONException {
> TestConsumerStoreOffsetZookeeper testConsumer = new
> TestConsumerStoreOffsetZookeeper();
> JSONObject jsonObject = new JSONObject();
> jsonObject.put("topicName", "VCCTask");
> jsonObject.put("clientName", "testClient");
> jsonObject.put("partition", args[0]);
> jsonObject.put("hostPort", "172.16.78.171");
> jsonObject.put("znodeName", "VCCTask");
> jsonObject.put("port", args[1]);
> testConsumer.initialize(jsonObject);
> final long startTime = System.currentTimeMillis();
> testConsumer.startReceiving(new FutureCallback<byte[]>() {
> int noOfMessagesConsumed= 0;
> @Override
> public void onSuccess(byte[] result) {
> LOG.info("YES!! " + ByteBuffer.wrap(result).getLong());
> ++noOfMessagesConsumed;
> LOG.info("# Messages consumed "+ noOfMessagesConsumed +" Time
> elapsed"+ (System.currentTimeMillis()-startTime )/1000 +" seconds");
> }
> @Override
> public void onFailure(Throwable t) {
> LOG.info("NO!! " + t.fillInStackTrace().getMessage());
> }
> });
> }
> private String topicToRead;
> private static Logger LOG =
> Logger.getLogger("TestConsumerStoreOffsetZookeeper");
> List<String> seedBrokers = Lists.newArrayList("localhost");
> private int port;
> private SimpleConsumer consumer;
> Integer partition;
> String clientName;
> private Broker currentLeader;
> private String counter;
> CuratorFramework zooKeeper;
> public void startReceiving(final FutureCallback<byte[]> futureCallback) {
> findLeaderAndUpdateSelfPointers(seedBrokers, port, topicToRead,
> partition);
> LOG.info("Kafka consumer delegate listening on topic " + topicToRead
> + " and partition " + partition);
> int numErrors = 0;
> while (true) {
> long latestOffset = 0;
> Stat stat = null;
> final String path = "/" + topicToRead + "/"+partition;
> try {
> //************************Read top of the
> stat = zooKeeper.checkExists().forPath(path);
> if (stat == null) {
> latestOffset =
> getLastOffsetFromBeginningOfStream(this.consumer, topicToRead, partition,
> OffsetRequest.EarliestTime(), clientName);
> byte b[] = new byte[8];
> ByteBuffer byteBuffer = ByteBuffer.wrap(b);
> byteBuffer.putLong(latestOffset);
> final String s =
> zooKeeper.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path);
> LOG.info(" Zookeeper create string is "+ s);
> stat = zooKeeper.checkExists().forPath(path);
> if (stat == null) {
> LOG.info("Stat was null");
> throw new RuntimeException("Stat in zookeeper was
> null, cannot continue as message stream cannot be persisted");
> }
> } else {
> final byte[] data =
> zooKeeper.getData().storingStatIn(stat).forPath(path);
> if(data.length>0){
> latestOffset = ByteBuffer.wrap(data).getLong();
> }else {
> latestOffset =
> getLastOffsetFromBeginningOfStream(this.consumer,topicToRead,partition,OffsetRequest.EarliestTime(),clientName);
> }
> }
> } catch (Exception e) {
> throw new RuntimeException(e.fillInStackTrace().getMessage());
> }
> LOG.info("Topic name is " + topicToRead);
> LOG.info("Last offset is " + latestOffset);
> LOG.info("Constructing new fetch request on " + topicToRead + "
> from offset" + latestOffset);
> FetchRequest request = new
> FetchRequestBuilder().clientId(clientName).addFetch(topicToRead, partition,
> latestOffset, 100000).build();
> FetchResponse fetchResponse = consumer.fetch(request);
> if (fetchResponse.hasError()) {
> numErrors++;
> final short code = fetchResponse.errorCode(topicToRead,
> partition);
> LOG.info("Error fetching data from broker: " +
> consumer.host() + " Reason " + code);
> if (code == ErrorMapping.OffsetOutOfRangeCode()) {
> LOG.info("Offset out of range error: calculating offset
> again");
> throw new RuntimeException("Offset is out of range,
> multiple consumers are not allowed, this consumer will exit");
> }
> if (numErrors > 5 && code!=3) {
> consumer.close();
> consumer = null;
> findLeaderAndUpdateSelfPointers(seedBrokers, port,
> topicToRead, partition);
> numErrors = 0;
> }
> continue;
> }
> final ByteBufferMessageSet messageAndOffsets =
> fetchResponse.messageSet(topicToRead, partition);
> final int validBytes = messageAndOffsets.validBytes();
> LOG.info("Received fetch response on topic " + topicToRead + "
> from offset" + latestOffset + " fetch response valid bytes is " + validBytes);
> try {
> if (validBytes == 0) {
> LOG.info("No message received");
> //Don't keep hammering Kafka
> Thread.sleep(1000);
> continue;
> }
> for (MessageAndOffset messageAndOffset : messageAndOffsets) {
> LOG.info("Processing offset");
> final long currentOffset = messageAndOffset.offset();
> LOG.info("Processing offset " + currentOffset);
> //in case of compression entire block may be received
> if (currentOffset < latestOffset) {
> LOG.info("Found an old offset: " + currentOffset +
> "Expecting:" + latestOffset);
> continue;
> }
> final ByteBuffer payload =
> messageAndOffset.message().payload();
> byte[] bytes = new byte[payload.limit()];
> payload.get(bytes);
> LOG.info(this.getClass().getName() + " Received message
> from offset" + String.valueOf(latestOffset) + new String(bytes, "UTF-8"));
> LOG.info(this.getClass().getName() + " Executing future
> callback");
> //TODO ***************this should be atomic with writing
> job in db***********************
> futureCallback.onSuccess(bytes);
> try {
> long nextOffset = messageAndOffset.nextOffset();
> incrementOffset(nextOffset, stat, path);
> } catch (KeeperException | InterruptedException e) {
> LOG.info("Encountered exception in writing to" +
> e.fillInStackTrace().getMessage());
> }
>
> //****************************************************************************************
> }
> LOG.info("Outside for loop");
> } catch (Exception e1) {
> LOG.info("Error in processing message or running callback " +
> e1.getMessage());
> futureCallback.onFailure(e1);
> throw new RuntimeException(e1);
> }
> }
> }
> private void incrementOffset(long nextOffset, Stat stat, String path)
> throws Exception {
> if (stat == null) {
> throw new RuntimeException("Given stat was null");
> }
> byte b[] = new byte[8];
> ByteBuffer byteBuffer = ByteBuffer.wrap(b);
> byteBuffer.putLong(nextOffset);
> LOG.info("Offset consumed successfully: Setting offset in zookeeper
> as next offset: "+ nextOffset);
> final Stat statWrite = zooKeeper.setData().forPath(path, b);
> if(statWrite.getDataLength() ==0){
> throw new RuntimeException("Unable to save offset in zookeeper");
> }
> }
> //TODO: agupta adapters should not have an initialize method, rename and
> merge with startListening
> public void initialize(JSONObject configData) {
> try {
> final String hostPort = configData.getString("hostPort");
> zooKeeper = CuratorFrameworkFactory.newClient(hostPort,new
> ExponentialBackoffRetry(10, 3000));
> zooKeeper.start();
> this.counter = configData.getString("znodeName");
> this.topicToRead = configData.getString("topicName");
> LOG.info("Topic name is " + topicToRead);
> //TODO: agupta: read seedbrokers from zookeeper
> //*ZkClient zkClient = new ZkClient("localhost:2108", 4000, 6000,
> new BytesPushThroughSerializer());
> //List<String> brokerList = zkClient.getChildren("/brokers/ips");
> List<String> seedBrokers = Lists.newArrayList("localhost");
> this.seedBrokers = seedBrokers;
> this.port = configData.getInt("port");
> this.partition= configData.getInt("partition");
> this.clientName = configData.getString("clientName");
> LOG.info("Finding leader with for partition " + partition + "
> clientName " + clientName);
> } catch (JSONException | IOException e) {
> e.printStackTrace();
> LOG.info("Error parsing configuration" + e.getMessage());
> } catch (Exception e) {
> LOG.info("Error starting zookeeper" + e.getMessage());
> }
> }
> /**
> * Find last offset to define where to start reading if this is the first
> read
> *
> * @param consumer
> * @param topic
> * @param partition
> * @param whichTime
> * @param clientName
> * @return
> */
> public static long getLastOffsetFromBeginningOfStream(SimpleConsumer
> consumer, String topic, int partition,
> long whichTime,
> String clientName) {
> TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
> partition);
> Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new
> HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
> requestInfo.put(topicAndPartition, new
> PartitionOffsetRequestInfo(whichTime, 1));
> kafka.javaapi.OffsetRequest request = new
> kafka.javaapi.OffsetRequest(requestInfo,
> kafka.api.OffsetRequest.CurrentVersion(), clientName);
> OffsetResponse response = consumer.getOffsetsBefore(request);
> if (response.hasError()) {
> System.out.println("Error fetching data Offset Data the Broker.
> Reason: " + response.errorCode(topic, partition));
> return 0;
> }
> long[] offsets = response.offsets(topic, partition);
> return offsets[0];
> }
> /**
> * Return the lead broker for a given topic and partition
> *
> * @param seedBrokers
> * @param port
> * @param topic
> * @param partition
> * @return
> */
> private PartitionMetadata findLeaderAndUpdateSelfPointers(List<String>
> seedBrokers, int port, String topic, int partition) {
> PartitionMetadata returnMetaData = null;
> loop:
> for (String seed : seedBrokers) {
> SimpleConsumer consumer = null;
> try {
> this.consumer = new SimpleConsumer(seed, port, 100000, 64 *
> 1024, "leaderLookup");
> List<String> topics = Collections.singletonList(topic);
> TopicMetadataRequest req = new TopicMetadataRequest(topics);
> kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
> List<TopicMetadata> metaData = resp.topicsMetadata();
> for (TopicMetadata item : metaData) {
> for (PartitionMetadata part : item.partitionsMetadata()) {
> if (part.partitionId() == partition) {
> returnMetaData = part;
> LOG.info("Found leader " +
> returnMetaData.leader().host());
> break loop;
> }
> }
> }
> } catch (Exception e) {
> LOG.info("Error communicating with Broker [" + seed + "] to
> find Leader for [" + topic
> + ", " + partition + "] Reason: " + e);
> } finally {
> if (consumer != null) consumer.close();
> }
> }
> LOG.info("KafkaConsumerDelegate initializing self pointers ");
> if (returnMetaData != null) {
> currentLeader = returnMetaData.leader();
> if (currentLeader != null) {
> this.consumer = new SimpleConsumer(currentLeader.host(),
> currentLeader.port(), 100000, 64 * 1024, clientName);
> }
> }
> LOG.info("KafkaConsumerDelegate: returning metadata");
> return returnMetaData;
> }
> *******************************
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)