Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2243#discussion_r130438296
--- Diff:
examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
---
@@ -42,72 +42,68 @@
import org.apache.storm.tuple.Values;
public class TridentKafkaClientWordCountNamedTopics {
+
private static final String TOPIC_1 = "test-trident";
private static final String TOPIC_2 = "test-trident-1";
private static final String KAFKA_LOCAL_BROKER = "localhost:9092";
- private KafkaTridentSpoutOpaque<String, String>
newKafkaTridentSpoutOpaque() {
- return new KafkaTridentSpoutOpaque<>(newKafkaSpoutConfig());
+ private KafkaTridentSpoutOpaque<String, String>
newKafkaTridentSpoutOpaque(KafkaSpoutConfig<String, String> spoutConfig) {
+ return new KafkaTridentSpoutOpaque<>(spoutConfig);
}
private static Func<ConsumerRecord<String, String>, List<Object>>
JUST_VALUE_FUNC = new JustValueFunc();
/**
- * Needs to be serializable
+ * Needs to be serializable.
*/
private static class JustValueFunc implements
Func<ConsumerRecord<String, String>, List<Object>>, Serializable {
+
@Override
public List<Object> apply(ConsumerRecord<String, String> record) {
return new Values(record.value());
}
}
- protected KafkaSpoutConfig<String,String> newKafkaSpoutConfig() {
- return KafkaSpoutConfig.builder(KAFKA_LOCAL_BROKER, TOPIC_1,
TOPIC_2)
- .setProp(ConsumerConfig.GROUP_ID_CONFIG,
"kafkaSpoutTestGroup_" + System.nanoTime())
- .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
200)
- .setRecordTranslator(JUST_VALUE_FUNC, new Fields("str"))
- .setRetry(newRetryService())
- .setOffsetCommitPeriodMs(10_000)
- .setFirstPollOffsetStrategy(EARLIEST)
- .setMaxUncommittedOffsets(250)
- .build();
+ protected KafkaSpoutConfig<String, String> newKafkaSpoutConfig(String
bootstrapServers) {
+ return KafkaSpoutConfig.builder(bootstrapServers, TOPIC_1, TOPIC_2)
+ .setProp(ConsumerConfig.GROUP_ID_CONFIG,
"kafkaSpoutTestGroup_" + System.nanoTime())
+ .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 200)
+ .setRecordTranslator(JUST_VALUE_FUNC, new Fields("str"))
+ .setRetry(newRetryService())
+ .setOffsetCommitPeriodMs(10_000)
+ .setFirstPollOffsetStrategy(EARLIEST)
+ .setMaxUncommittedOffsets(250)
+ .build();
}
protected KafkaSpoutRetryService newRetryService() {
return new KafkaSpoutRetryExponentialBackoff(new
TimeInterval(500L, TimeUnit.MICROSECONDS),
- TimeInterval.milliSeconds(2), Integer.MAX_VALUE,
TimeInterval.seconds(10));
+ TimeInterval.milliSeconds(2), Integer.MAX_VALUE,
TimeInterval.seconds(10));
}
public static void main(String[] args) throws Exception {
new TridentKafkaClientWordCountNamedTopics().run(args);
}
- protected void run(String[] args) throws AlreadyAliveException,
InvalidTopologyException, AuthorizationException, InterruptedException {
- if (args.length > 0 && Arrays.stream(args).anyMatch(option ->
option.equals("-h"))) {
- System.out.printf("Usage: java %s [%s] [%s] [%s] [%s]\n",
getClass().getName(),
- "broker_host:broker_port", "topic1", "topic2",
"topology_name");
- } else {
- final String brokerUrl = args.length > 0 ? args[0] :
KAFKA_LOCAL_BROKER;
- final String topic1 = args.length > 1 ? args[1] : TOPIC_1;
- final String topic2 = args.length > 2 ? args[2] : TOPIC_2;
-
- System.out.printf("Running with broker_url: [%s], topics: [%s,
%s]\n", brokerUrl, topic1, topic2);
-
- Config tpConf = new Config();
- tpConf.setDebug(true);
- tpConf.setMaxSpoutPending(5);
-
- // Producers
- StormSubmitter.submitTopology(topic1 + "-producer", tpConf,
KafkaProducerTopology.newTopology(brokerUrl, topic1));
- StormSubmitter.submitTopology(topic2 + "-producer", tpConf,
KafkaProducerTopology.newTopology(brokerUrl, topic2));
- // Consumer
- StormSubmitter.submitTopology("topics-consumer", tpConf,
TridentKafkaConsumerTopology.newTopology(newKafkaTridentSpoutOpaque()));
-
- // Print results to console, which also causes the print
filter in the consumer topology to print the results in the worker log
- Thread.sleep(2000);
- DrpcResultsPrinter.remoteClient().printResults(60, 1,
TimeUnit.SECONDS);
- }
+ protected void run(String[] args) throws AlreadyAliveException,
InvalidTopologyException,
--- End diff --
The topic name wasn't being passed to the consumer before, only the
producers as far as I could tell, so if you used the parameters the example
didn't work. Fixing it caused a conflict with the wildcard example, because I'd
have to change the newKafkaSpoutConfig signature to take a list of topics. That
won't work with the Pattern required by the wildcard example. It seemed easier
to just remove the option.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---