[ 
https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15073020#comment-15073020
 ] 

Mario Briggs edited comment on SPARK-12177 at 12/28/15 7:02 PM:
----------------------------------------------------------------

  Hi Nikita,
 thanks. Here are my review comments. I couldnt find a way to add them on the 
PR review in github, so added them here. My comments are a little detailed, 
since i too developed an implementation and tested it along with my colleague 
Praveen :-) after initial discussion on dev list 

A - KafkaCluster class
     getPartitions()
     seek()
        callers of the above methods
        all other methods that use withConsumer()

 These should not return a ‘Either’, but rather just the expected object ( the 
‘Right’). The reason for the ‘Either’ object in the earlier code was due to the 
fact the earlier kafka client had to deal with trying the operation on all the 
‘seedBrokers’ and handle the case if some of them were down. Similarly when 
dealing with ‘leaders’, client had to try the operation on all leaders for a TP 
(TopicAndPartition).  When we use the new kaka-clients API, we don’t have to 
deal with trying against all the seedBrokers, leaders etc, since the new 
KafkaConsumer object internally handles all those details.
Notice that in the earlier code, withBrokers() tries to connect() and invoke 
the passed in function multiple times with the brokers.forEach() and hence the 
need to accumulate errors. The earlier code also did a ‘return’ immediately 
when successful with one of the brokers. This does not apply with the new 
KafkaConsumer object.

getPartitions() - 
https://github.com/nikit-os/spark/blob/kafka-09-consumer-api/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/KafkaCluster.scala#L62
  consumer.partitionsFor() java API will returns a null if the topic doesn’t 
exist. If you don’t handle that, you run into a NPE when the user specifies a 
topic that doesn’t exist or makes a typo in the topic name (also not returning 
an exception saying the partition doesn’t exist is not right)

our implementation is at - 
https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala.
 If it is easier for you that we issue a PR to your repo, let us know

B - KafkaRDD class
   getPreferredLocations()
  this method is missing in your code. The earlier implementation from Cody had 
the optimisation that if Kafka and the spark code (KafkaRDD) was running on the 
same cluster, then the RDD partition for a particular TopicPartition, would be 
local to that TopicPartition leader. Could you please add code to bring back 
this functionality.

 Our implementation, pulled this info inside the getPartitions- 
https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala#L52.
 Probably more efficient to do it inside compute() of the DStream, but that 
meant exposing ‘leaders’ on the KafkaRDD constructor as discussed - 
https://mail-archives.apache.org/mod_mbox/spark-dev/201512.mbox/%3CCAKWX9VV34Dto9irT3ZZfH78EoXO3bV3VHN8YYvTxfnyvGcRsQw%40mail.gmail.com%3E

C - KafkaRDDIterator class

 getNext()
  As mentioned in issue #1 noted here - 
https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/README.md#issues-noted-with-kafka-090-consumer-api
,KafkaConsumer.poll(x) is not guaranteed to return the data, when x is a small 
value. We are following up this issue with kafka - 
https://issues.apache.org/jira/browse/KAFKA-3044 . I see that you have made 
this a configurable value in your implementation which is good, but either ways 
till this behaviour is clarified or even otherwise, we need this assert 
-https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala#L171
 or else we will be silently skipping data without the user knowing it (either 
default value or user specifying a smaller value)

D- Non use of TopicPartition class of new Consumer

   You already have figured out that this class is not Serializable and hence 
in the public interface you have used the older TopicAndPartition class. We 
have raised this issue https://issues.apache.org/jira/browse/KAFKA-3029 with 
Kafka and maybe be provided with one (yet to see). However using the older 
TopicAndPartition class in our public API, which introduces a dependency on the 
older kafka core rather than just kaka-clients jar, i would think is not the 
preferred approach. If we are not provided with a serializable TopicPartition, 
then we should rather use our own serializable object (or just a tuple of 
string, int, Long) inside of DirectKafkaInputDStream to ensure it is 
Serializable.



was (Author: mariobriggs):
  Hi Nikita,
 thanks. Here are my review comments. I couldnt find a way to add them on the 
PR review in github, so added them here. My comments are a little detailed, 
since i too developed an implementation and tested it along with my colleague 
Praveen :-) after initial discussion on dev list 

A - KafkaCluster class
     getPartitions()
     seek()
        callers of the above methods
        all other methods that use withConsumer()

 These should not return a ‘Either’, but rather just the expected object ( the 
‘Right’). The reason for the ‘Either’ object in the earlier code was due to the 
fact the earlier kafka client had to deal with trying the operation on all the 
‘seedBrokers’ and handle the case if some of them were down. Similarly when 
dealing with ‘leaders’, client had to try the operation on all leaders for a TP 
(TopicAndPartition).  When we use the new kaka-clients API, we don’t have to 
deal with trying against all the seedBrokers, leaders etc, since the new 
KafkaConsumer object internally handles all those details.
Notice that in the earlier code, withBrokers() tries to connect() and invoke 
the passed in function multiple times with the brokers.forEach() and hence the 
need to accumulate errors. The earlier code also did a ‘return’ immediately 
when successful with one of the brokers. This does not apply with the new 
KafkaConsumer object.

getPartitions() - 
https://github.com/nikit-os/spark/blob/kafka-09-consumer-api/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/KafkaCluster.scala#L62
  consumer.partitionsFor() java API will returns a null if the topic doesn’t 
exist. If you don’t handle that, you run into a NPE when the user specifies a 
topic that doesn’t exist or makes a typo in the topic name (also not returning 
an exception saying the partition doesn’t exist is not right)

our implementation is at - 
https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala.
 If it is easier for you that we issue a PR to your repo, let us know

B - KafkaRDD class
   getPreferredLocations()
  this method is missing in your code. The earlier implementation from Cody had 
the optimisation that if Kafka and the spark code (KafkaRDD) was running on the 
same cluster, then the RDD partition for a particular TopicPartition, would be 
local to that TopicPartition leader. Could you please add code to bring back 
this functionality.

 Our implementation, pulled this info inside the getPartitions- 
https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala#L52.
 Probably more efficient to do it inside compute() of the DStream, but that 
meant exposing ‘leaders’ on the KafkaRDD constructor as discussed - 
https://mail-archives.apache.org/mod_mbox/spark-dev/201512.mbox/%3CCAKWX9VV34Dto9irT3ZZfH78EoXO3bV3VHN8YYvTxfnyvGcRsQw%40mail.gmail.com%3E

C - KafkaRDDIterator class

 getNext()
  As mentioned in issue #1 noted here - 
https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/README.md#issues-noted-with-kafka-090-consumer-api
,KafkaConsumer.poll(x) is not guaranteed to return the data, when x is a small 
value. We are following up with [this issue with 
kafka](https://issues.apache.org/jira/browse/KAFKA-3044.I see that you have 
made this a configurable value in your implementation which is good, but either 
ways till this behaviour is clarified or even otherwise, we need [this 
assert](https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala#L171)
 or else we will be silently skipping data without the user knowing it (either 
default value or user specifying a smaller value)

D- Non use of TopicPartition class of new Consumer

   You already have figured out that this class is not Serializable and hence 
in the public interface you have used the older TopicAndPartition class. We 
have raised [this issue](https://issues.apache.org/jira/browse/KAFKA-3029) with 
Kafka and maybe be provided with one (yet to see). However using the older 
TopicAndPartition class in our public API, which introduces a dependency on the 
older kafka core rather than just kaka-clients jar, i would think is not the 
preferred approach. If we are not provided with a serializable TopicPartition, 
then we should rather use our own serializable object (or just a tuple of 
string, int, Long) inside of DirectKafkaInputDStream to ensure it is 
Serializable.


> Update KafkaDStreams to new Kafka 0.9 Consumer API
> --------------------------------------------------
>
>                 Key: SPARK-12177
>                 URL: https://issues.apache.org/jira/browse/SPARK-12177
>             Project: Spark
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 1.6.0
>            Reporter: Nikita Tarasenko
>              Labels: consumer, kafka
>
> Kafka 0.9 already released and it introduce new consumer API that not 
> compatible with old one. So, I added new consumer api. I made separate 
> classes in package org.apache.spark.streaming.kafka.v09 with changed API. I 
> didn't remove old classes for more backward compatibility. User will not need 
> to change his old spark applications when he uprgade to new Spark version.
> Please rewiew my changes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to