[ 
https://issues.apache.org/jira/browse/FLUME-2250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14134256#comment-14134256
 ] 

Hari Shreedharan commented on FLUME-2250:
-----------------------------------------

I took a look. Looks mostly good, some comments:
- Please use EventBuilder.withBody method to create the event instead of using 
the SimpleEvent class directly.
- log.debug("Message: {}", new String(bytes)); -> should go in if 
(log.isDebugEnabled())
- If no events were received from Kafka, 
getChannelProcessor().processEventBatch(eventList); call is a waste. So you 
probably want to check for the list size before this call.
- This code should go into start() method, not configure:
{code}
    try {
      this.consumer = KafkaSourceUtil.getConsumer(context);
    } catch (IOException e) {
      log.error("IOException occur, {}", e.getMessage());
    } catch (InterruptedException e) {
      log.error("InterruptedException occur, {}", e.getMessage());
    }
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(1));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumer.createMessageStreams(topicCountMap);
    if(consumerMap == null) {
      throw new ConfigurationException("topicCountMap is null");
    }
    List<KafkaStream<byte[], byte[]>> topicList = consumerMap.get(topic);
    if(topicList == null || topicList.isEmpty()) {
      throw new ConfigurationException("topicList is null or empty");
    }
    KafkaStream<byte[], byte[]> stream =  topicList.get(0);
    it = stream.iterator();
{code}
- Also, those exceptions don't give any details on what they mean. Is this 
because the config is bad, is it because the topics don't exist in Kafka etc? 
Basically configure method is called exactly once - so if this is an error that 
cannot be fixed automatically and requires a reload of the config - it is fine 
to keep it here, but if a retry can fix it this code should be in start() which 
gets called until the component is started.
- Question about this method:
{code}
  IterStatus timedHasNext() {
    try {
      long startTime = System.currentTimeMillis();
      it.hasNext();
      long endTime = System.currentTimeMillis();
      return new IterStatus(true,endTime-startTime);
    } catch (ConsumerTimeoutException e) {
      return new IterStatus(false,consumerTimeout);
    }
  }
{code}
Shouldn't you be checking for the output of it.hasNext? Or will hasNext return 
only if there is hasNext is true?
- This method is never used: public boolean hasData();
- Couple of nits: May lines are > 80 chars, in many places no new line after , 
- most IDEs can fix it automatically for you.
- Can you add some docs on what some of the Kafka specific methods so - so it 
is possible for people to understand later.

> Add support for Kafka Source
> ----------------------------
>
>                 Key: FLUME-2250
>                 URL: https://issues.apache.org/jira/browse/FLUME-2250
>             Project: Flume
>          Issue Type: Sub-task
>          Components: Sinks+Sources
>    Affects Versions: v1.5.0
>            Reporter: Ashish Paliwal
>            Priority: Minor
>         Attachments: FLUME-2250-0.patch, FLUME-2250-1.patch, 
> FLUME-2250-2.patch, FLUME-2250.patch
>
>
> Add support for Kafka Source



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to