Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2243#discussion_r130438296
  
    --- Diff: 
examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
 ---
    @@ -42,72 +42,68 @@
     import org.apache.storm.tuple.Values;
     
     public class TridentKafkaClientWordCountNamedTopics {
    +
         private static final String TOPIC_1 = "test-trident";
         private static final String TOPIC_2 = "test-trident-1";
         private static final String KAFKA_LOCAL_BROKER = "localhost:9092";
     
    -    private KafkaTridentSpoutOpaque<String, String> 
newKafkaTridentSpoutOpaque() {
    -        return new KafkaTridentSpoutOpaque<>(newKafkaSpoutConfig());
    +    private KafkaTridentSpoutOpaque<String, String> 
newKafkaTridentSpoutOpaque(KafkaSpoutConfig<String, String> spoutConfig) {
    +        return new KafkaTridentSpoutOpaque<>(spoutConfig);
         }
     
         private static Func<ConsumerRecord<String, String>, List<Object>> 
JUST_VALUE_FUNC = new JustValueFunc();
     
         /**
    -     * Needs to be serializable
    +     * Needs to be serializable.
          */
         private static class JustValueFunc implements 
Func<ConsumerRecord<String, String>, List<Object>>, Serializable {
    +
             @Override
             public List<Object> apply(ConsumerRecord<String, String> record) {
                 return new Values(record.value());
             }
         }
     
    -    protected KafkaSpoutConfig<String,String> newKafkaSpoutConfig() {
    -        return KafkaSpoutConfig.builder(KAFKA_LOCAL_BROKER, TOPIC_1, 
TOPIC_2)
    -                .setProp(ConsumerConfig.GROUP_ID_CONFIG, 
"kafkaSpoutTestGroup_" + System.nanoTime())
    -                .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 
200)
    -                .setRecordTranslator(JUST_VALUE_FUNC, new Fields("str"))
    -                .setRetry(newRetryService())
    -                .setOffsetCommitPeriodMs(10_000)
    -                .setFirstPollOffsetStrategy(EARLIEST)
    -                .setMaxUncommittedOffsets(250)
    -                .build();
    +    protected KafkaSpoutConfig<String, String> newKafkaSpoutConfig(String 
bootstrapServers) {
    +        return KafkaSpoutConfig.builder(bootstrapServers, TOPIC_1, TOPIC_2)
    +            .setProp(ConsumerConfig.GROUP_ID_CONFIG, 
"kafkaSpoutTestGroup_" + System.nanoTime())
    +            .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 200)
    +            .setRecordTranslator(JUST_VALUE_FUNC, new Fields("str"))
    +            .setRetry(newRetryService())
    +            .setOffsetCommitPeriodMs(10_000)
    +            .setFirstPollOffsetStrategy(EARLIEST)
    +            .setMaxUncommittedOffsets(250)
    +            .build();
         }
     
         protected KafkaSpoutRetryService newRetryService() {
             return new KafkaSpoutRetryExponentialBackoff(new 
TimeInterval(500L, TimeUnit.MICROSECONDS),
    -                TimeInterval.milliSeconds(2), Integer.MAX_VALUE, 
TimeInterval.seconds(10));
    +            TimeInterval.milliSeconds(2), Integer.MAX_VALUE, 
TimeInterval.seconds(10));
         }
     
         public static void main(String[] args) throws Exception {
             new TridentKafkaClientWordCountNamedTopics().run(args);
         }
     
    -    protected void run(String[] args) throws AlreadyAliveException, 
InvalidTopologyException, AuthorizationException, InterruptedException {
    -        if (args.length > 0 && Arrays.stream(args).anyMatch(option -> 
option.equals("-h"))) {
    -            System.out.printf("Usage: java %s [%s] [%s] [%s] [%s]\n", 
getClass().getName(),
    -                    "broker_host:broker_port", "topic1", "topic2", 
"topology_name");
    -        } else {
    -            final String brokerUrl = args.length > 0 ? args[0] : 
KAFKA_LOCAL_BROKER;
    -            final String topic1 = args.length > 1 ? args[1] : TOPIC_1;
    -            final String topic2 = args.length > 2 ? args[2] : TOPIC_2;
    -
    -            System.out.printf("Running with broker_url: [%s], topics: [%s, 
%s]\n", brokerUrl, topic1, topic2);
    -
    -            Config tpConf = new Config();
    -            tpConf.setDebug(true);
    -            tpConf.setMaxSpoutPending(5);
    -
    -            // Producers
    -            StormSubmitter.submitTopology(topic1 + "-producer", tpConf, 
KafkaProducerTopology.newTopology(brokerUrl, topic1));
    -            StormSubmitter.submitTopology(topic2 + "-producer", tpConf, 
KafkaProducerTopology.newTopology(brokerUrl, topic2));
    -            // Consumer
    -            StormSubmitter.submitTopology("topics-consumer", tpConf, 
TridentKafkaConsumerTopology.newTopology(newKafkaTridentSpoutOpaque()));
    -
    -            // Print results to console, which also causes the print 
filter in the consumer topology to print the results in the worker log
    -            Thread.sleep(2000);
    -            DrpcResultsPrinter.remoteClient().printResults(60, 1, 
TimeUnit.SECONDS);
    -        }
    +    protected void run(String[] args) throws AlreadyAliveException, 
InvalidTopologyException,
    --- End diff --
    
    The topic name wasn't being passed to the consumer before, only the 
producers as far as I could tell, so if you used the parameters the example 
didn't work. Fixing it caused a conflict with the wildcard example, because I'd 
have to change the newKafkaSpoutConfig signature to take a list of topics. That 
won't work with the Pattern required by the wildcard example. It seemed easier 
to just remove the option.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to