[ 
https://issues.apache.org/jira/browse/METRON-793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938450#comment-15938450
 ] 

ASF GitHub Bot commented on METRON-793:
---------------------------------------

Github user cestella commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/486#discussion_r107683860
  
    --- Diff: 
metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
 ---
    @@ -0,0 +1,234 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.metron.storm.kafka.flux;
    +
    +import com.google.common.base.Joiner;
    +import org.apache.kafka.clients.consumer.Consumer;
    +import org.apache.kafka.clients.consumer.ConsumerRecord;
    +import org.apache.kafka.common.serialization.ByteArrayDeserializer;
    +import org.apache.metron.common.utils.KafkaUtils;
    +import org.apache.storm.kafka.spout.*;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.OutputFieldsGetter;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Values;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.function.Function;
    +
    +/**
    + * This is a convenience layer on top of the KafkaSpoutConfig.Builder 
available in storm-kafka-client.
    + * The justification for this class is two-fold.  First, there are a lot 
of moving parts and a simplified
    + * approach to constructing spouts is useful.  Secondly, and perhaps more 
importantly, the Builder pattern
    + * is decidedly unfriendly to use inside of Flux.  Finally, we can make 
things a bit more friendly by only requiring
    + * zookeeper and automatically figuring out the brokers for the bootstrap 
server.
    + *
    + * @param <K> The kafka key type
    + * @param <V> The kafka value type
    + */
    +public class SimpleStormKafkaBuilder<K, V> extends 
KafkaSpoutConfig.Builder<K, V> {
    +  final static String STREAM = "default";
    +
    +  /**
    +   * The fields exposed by the kafka consumer.  These will show up in the 
Storm tuple.
    +   */
    +  public enum FieldsConfiguration {
    +    KEY("key", record -> record.key()),
    +    VALUE("value", record -> record.value()),
    +    PARTITION("partition", record -> record.partition()),
    +    TOPIC("topic", record -> record.topic())
    +    ;
    +    String fieldName;
    +    Function<ConsumerRecord,Object> recordExtractor;
    +
    +    FieldsConfiguration(String fieldName, Function<ConsumerRecord,Object> 
recordExtractor) {
    +      this.recordExtractor = recordExtractor;
    +      this.fieldName = fieldName;
    +    }
    +
    +    /**
    +     * Return a list of the enums
    +     * @param configs
    +     * @return
    +     */
    +    public static List<FieldsConfiguration> toList(String... configs) {
    +      List<FieldsConfiguration> ret = new ArrayList<>();
    +      for(String config : configs) {
    +        ret.add(FieldsConfiguration.valueOf(config.toUpperCase()));
    +      }
    +      return ret;
    +    }
    +
    +    /**
    +     * Return a list of the enums from their string representation.
    +     * @param configs
    +     * @return
    +     */
    +    public static List<FieldsConfiguration> toList(List<String> configs) {
    +      List<FieldsConfiguration> ret = new ArrayList<>();
    +      for(String config : configs) {
    +        ret.add(FieldsConfiguration.valueOf(config.toUpperCase()));
    +      }
    +      return ret;
    +    }
    +
    +    /**
    +     * Construct a Fields object from an iterable of enums.  These fields 
are the fields
    +     * exposed in the Storm tuple emitted from the spout.
    +     * @param configs
    +     * @return
    +     */
    +    public static Fields getFields(Iterable<FieldsConfiguration> configs) {
    +      List<String> fields = new ArrayList<>();
    +      for(FieldsConfiguration config : configs) {
    +        fields.add(config.fieldName);
    +      }
    +      return new Fields(fields);
    +    }
    +  }
    +
    +  /**
    +   * Build a tuple given the fields and the topic.  We want to use our 
FieldsConfiguration enum
    +   * to define what this tuple looks like.
    +   * @param <K> The key type in kafka
    +   * @param <V> The value type in kafka
    +   */
    +  public static class TupleBuilder<K, V> extends 
KafkaSpoutTupleBuilder<K,V> {
    +    private List<FieldsConfiguration> configurations;
    +    private TupleBuilder(String topic, List<FieldsConfiguration> 
configurations) {
    +      super(topic);
    +      this.configurations = configurations;
    +    }
    +
    +    /**
    +     * Builds a list of tuples using the ConsumerRecord specified as 
parameter
    +     *
    +     * @param consumerRecord whose contents are used to build tuples
    +     * @return list of tuples
    +     */
    +    @Override
    +    public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
    +      Values ret = new Values();
    +      for(FieldsConfiguration config : configurations) {
    +        ret.add(config.recordExtractor.apply(consumerRecord));
    +      }
    +      return ret;
    +    }
    +  }
    +
    +  private String topic;
    +
    +  /**
    +   * Create an object with the specified properties.  This will expose 
fields "key" and "value."
    +   * @param kafkaProps The special kafka properties
    +   * @param topic The kafka topic. TODO: In the future, support multiple 
topics and regex patterns.
    +   * @param zkQuorum The zookeeper quorum.  We will use this to pull the 
brokers from this.
    +   */
    +  public SimpleStormKafkaBuilder( Map<String, Object> kafkaProps
    +                                , String topic
    +                                , String zkQuorum
    +                                )
    +  {
    +    this(kafkaProps, topic, zkQuorum, Arrays.asList("key", "value"));
    +  }
    +
    +  /**
    +   * Create an object with the specified properties and exposing the 
specified fields.
    +   * @param kafkaProps The special kafka properties
    +   * @param topic The kafka topic. TODO: In the future, support multiple 
topics and regex patterns.
    +   * @param zkQuorum The zookeeper quorum.  We will use this to pull the 
brokers from this.
    +   * @param fieldsConfiguration The fields to expose in the storm tuple 
emitted.
    +   */
    +  public SimpleStormKafkaBuilder( Map<String, Object> kafkaProps
    +                                , String topic
    +                                , String zkQuorum
    +                                , List<String> fieldsConfiguration
    +                                )
    +  {
    +    super( modifyKafkaProps(kafkaProps, zkQuorum)
    +         , createStreams(fieldsConfiguration, topic)
    +         , createTuplesBuilder(fieldsConfiguration, topic)
    +         );
    +    this.topic = topic;
    +  }
    +
    +  /**
    +   * Get the kafka topic.  TODO: In the future, support multiple topics 
and regex patterns.
    +   * @return
    +   */
    +  public String getTopic() {
    +    return topic;
    +  }
    +
    +  /**
    +   * Create a StormKafkaSpout from a given topic, zookeeper quorum and 
fields.  Also, configure the spout
    +   * using a Map that configures both kafka as well as the spout (see the 
properties in SpoutConfiguration).
    +   * @param topic
    +   * @param zkQuorum
    +   * @param fieldsConfiguration
    +   * @param kafkaProps  The aforementioned map.
    +   * @return
    +   */
    +  public static StormKafkaSpout create( String topic
    --- End diff --
    
    yep, agreed


> Migrate to storm-kafka-client kafka spout from storm-kafka
> ----------------------------------------------------------
>
>                 Key: METRON-793
>                 URL: https://issues.apache.org/jira/browse/METRON-793
>             Project: Metron
>          Issue Type: Improvement
>            Reporter: Casey Stella
>
> In order to eventually support kerberos, the suggested path is to migrate to 
> the new kafka spout (org.apache.storm:storm-kafka-client) which uses the new 
> consumer API.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to