Repository: spark Updated Branches: refs/heads/master 7bd6d5412 -> ac78bcce0
[SPARK-24743][EXAMPLES] Update the JavaDirectKafkaWordCount example to support the new API of kafka ## What changes were proposed in this pull request? Add some required configs for Kafka consumer in JavaDirectKafkaWordCount class. ## How was this patch tested? Manual tests on Local mode. Author: cluo <[email protected]> Closes #21717 from cluo512/SPARK-24743-update-JavaDirectKafkaWordCount. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac78bcce Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac78bcce Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac78bcce Branch: refs/heads/master Commit: ac78bcce00ff8ec8e5b7335c2807aa0cd0f5406a Parents: 7bd6d54 Author: cluo <[email protected]> Authored: Thu Jul 5 09:06:25 2018 -0500 Committer: cody koeninger <[email protected]> Committed: Thu Jul 5 09:06:25 2018 -0500 ---------------------------------------------------------------------- .../streaming/JavaDirectKafkaWordCount.java | 24 +++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ac78bcce/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java index b6b163f..748bf58 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java @@ -26,7 +26,9 @@ import java.util.regex.Pattern; import scala.Tuple2; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.SparkConf; import org.apache.spark.streaming.api.java.*; @@ -37,30 +39,33 @@ import org.apache.spark.streaming.Durations; /** * Consumes messages from one or more topics in Kafka and does wordcount. - * Usage: JavaDirectKafkaWordCount <brokers> <topics> + * Usage: JavaDirectKafkaWordCount <brokers> <groupId> <topics> * <brokers> is a list of one or more Kafka brokers + * <groupId> is a consumer group name to consume from topics * <topics> is a list of one or more kafka topics to consume from * * Example: * $ bin/run-example streaming.JavaDirectKafkaWordCount broker1-host:port,broker2-host:port \ - * topic1,topic2 + * consumer-group topic1,topic2 */ public final class JavaDirectKafkaWordCount { private static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) throws Exception { - if (args.length < 2) { - System.err.println("Usage: JavaDirectKafkaWordCount <brokers> <topics>\n" + - " <brokers> is a list of one or more Kafka brokers\n" + - " <topics> is a list of one or more kafka topics to consume from\n\n"); + if (args.length < 3) { + System.err.println("Usage: JavaDirectKafkaWordCount <brokers> <groupId> <topics>\n" + + " <brokers> is a list of one or more Kafka brokers\n" + + " <groupId> is a consumer group name to consume from topics\n" + + " <topics> is a list of one or more kafka topics to consume from\n\n"); System.exit(1); } StreamingExamples.setStreamingLogLevels(); String brokers = args[0]; - String topics = args[1]; + String groupId = args[1]; + String topics = args[2]; // Create context with a 2 seconds batch interval SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount"); @@ -68,7 +73,10 @@ public final class JavaDirectKafkaWordCount { Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(","))); Map<String, Object> kafkaParams = new HashMap<>(); - kafkaParams.put("metadata.broker.list", brokers); + kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // Create direct kafka stream with brokers and topics JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream( --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
