[ 
https://issues.apache.org/jira/browse/SPARK-20238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

rayu yuan updated SPARK-20238:
------------------------------
    Description: 
My question is 
https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
 correct?

I'm pretty new to Spark.  I wanted to find an example of Spark Streaming using 
Java, streaming from Kafka. The JavaDirectKafkaWordCount at 
https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
 looked to be perfect.

I copied code as below:
{code}
SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount")
                                .setMaster("spark://slc:7077");
                JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(10));

                Map<String, Object> kafkaParams = new HashMap<>();
                kafkaParams.put("bootstrap.servers", "10.0.1.2:9092");
                kafkaParams.put("key.deserializer", StringDeserializer.class);
                kafkaParams.put("value.deserializer", StringDeserializer.class);
                kafkaParams.put("group.id", "group1");
                kafkaParams.put("auto.offset.reset", "earliest");
                kafkaParams.put("enable.auto.commit", false);
                Collection<String> topics = Collections.singletonList("test");

                final Logger log = 
LogManager.getLogger(JavaDirectKafkaWordCount.class);

                final JavaInputDStream<ConsumerRecord<String, String>> stream = 
KafkaUtils.createDirectStream(jssc,
                                LocationStrategies.PreferConsistent(),
                                ConsumerStrategies.<String, 
String>Subscribe(topics, kafkaParams));
                stream.print();
                
{code}

Appeared to throw an error around logging:
{code}
17/04/05 22:43:10 INFO SparkContext: Starting job: print at 
JavaDirectKafkaWordCount.java:47
17/04/05 22:43:10 INFO DAGScheduler: Got job 0 (print at 
JavaDirectKafkaWordCount.java:47) with 1 output partitions
17/04/05 22:43:10 INFO DAGScheduler: Final stage: ResultStage 0 (print at 
JavaDirectKafkaWordCount.java:47)
17/04/05 22:43:10 INFO DAGScheduler: Parents of final stage: List()
17/04/05 22:43:10 INFO DAGScheduler: Missing parents: List()
17/04/05 22:43:10 INFO DAGScheduler: Submitting ResultStage 0 (KafkaRDD[0] at 
createDirectStream at JavaDirectKafkaWordCount.java:44), which has no missing 
parents
17/04/05 22:43:10 INFO MemoryStore: Block broadcast_0 stored as values in 
memory (estimated size 2.3 KB, free 366.3 MB)
17/04/05 22:43:10 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in 
memory (estimated size 1529.0 B, free 366.3 MB)
17/04/05 22:43:10 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 
10.245.226.155:15258 (size: 1529.0 B, free: 366.3 MB)
17/04/05 22:43:10 INFO SparkContext: Created broadcast 0 from broadcast at 
DAGScheduler.scala:996
17/04/05 22:43:10 INFO DAGScheduler: Submitting 1 missing tasks from 
ResultStage 0 (KafkaRDD[0] at createDirectStream at 
JavaDirectKafkaWordCount.java:44)
17/04/05 22:43:10 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
17/04/05 22:43:10 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered 
executor NettyRpcEndpointRef(null) (10.245.226.155:53448) with ID 0
17/04/05 22:43:10 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
10.245.226.155, executor 0, partition 0, PROCESS_LOCAL, 7295 bytes)
17/04/05 22:43:10 INFO BlockManagerMasterEndpoint: Registering block manager 
10.245.226.155:14669 with 366.3 MB RAM, BlockManagerId(0, 10.245.226.155, 
14669, None)
17/04/05 22:43:10 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered 
executor NettyRpcEndpointRef(null) (10.245.226.155:53447) with ID 1
17/04/05 22:43:10 INFO BlockManagerMasterEndpoint: Registering block manager 
10.245.226.155:33754 with 366.3 MB RAM, BlockManagerId(1, 10.245.226.155, 
33754, None)
17/04/05 22:43:11 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 
10.245.226.155, executor 0): java.lang.NullPointerException
        at org.apache.spark.util.Utils$.decodeFileNameInURI(Utils.scala:409)
        at org.apache.spark.util.Utils$.fetchFile(Utils.scala:434)
        at 
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:508)
        at 
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:500)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
        at 
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:500)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:257)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
{code}

So is the example in 
https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
 or is there something I could have done differently to get that example 
working?
and how I can debug spark jobs or logging of the jobs?

  was:
My question is 
https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
 correct?

I'm pretty new to both Spark and Java.  I wanted to find an example of Spark 
Streaming using Java, streaming from Kafka. The JavaKafkaWordCount at 
https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
 looked to be perfect.

However, when I tried running it, I found a couple of issues that I needed to 
overcome.

1. This line was unnecessary:
{code}
StreamingExamples.setStreamingLogLevels();
{code}

Having this line in there (and the associated import) caused me to go looking 
for a dependency spark-examples_2.10 which of no real use to me.

2. After running it, this line: 
{code}
JavaPairReceiverInputDStream<String, String> messages = 
KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
{code}

Appeared to throw an error around logging:
{code}
Exception in thread "main" java.lang.NoClassDefFoundError: 
org/apache/spark/Logging
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
        at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at 
org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:91
        at 
org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:66
        at 
org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:11
        at 
org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala)
        at main.java.com.cm.JavaKafkaWordCount.main(JavaKafkaWordCount.java:72)
{code}

To get around this, I found that the code sample in 
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html 
helped me to come up with the right lines to see streaming from Kafka in 
action. Specifically this called createDirectStream instead of createStream.

So is the example in 
https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
 or is there something I could have done differently to get that example 
working?


>  Is the JavaDirectKafkaWordCount example correct for Spark version 2.1?
> -----------------------------------------------------------------------
>
>                 Key: SPARK-20238
>                 URL: https://issues.apache.org/jira/browse/SPARK-20238
>             Project: Spark
>          Issue Type: Question
>          Components: Examples, ML
>    Affects Versions: 2.1.0
>            Reporter: rayu yuan
>
> My question is 
> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
>  correct?
> I'm pretty new to Spark.  I wanted to find an example of Spark Streaming 
> using Java, streaming from Kafka. The JavaDirectKafkaWordCount at 
> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
>  looked to be perfect.
> I copied code as below:
> {code}
> SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount")
>                               .setMaster("spark://slc:7077");
>               JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
> Durations.seconds(10));
>               Map<String, Object> kafkaParams = new HashMap<>();
>               kafkaParams.put("bootstrap.servers", "10.0.1.2:9092");
>               kafkaParams.put("key.deserializer", StringDeserializer.class);
>               kafkaParams.put("value.deserializer", StringDeserializer.class);
>               kafkaParams.put("group.id", "group1");
>               kafkaParams.put("auto.offset.reset", "earliest");
>               kafkaParams.put("enable.auto.commit", false);
>               Collection<String> topics = Collections.singletonList("test");
>               final Logger log = 
> LogManager.getLogger(JavaDirectKafkaWordCount.class);
>               final JavaInputDStream<ConsumerRecord<String, String>> stream = 
> KafkaUtils.createDirectStream(jssc,
>                               LocationStrategies.PreferConsistent(),
>                               ConsumerStrategies.<String, 
> String>Subscribe(topics, kafkaParams));
>               stream.print();
>               
> {code}
> Appeared to throw an error around logging:
> {code}
> 17/04/05 22:43:10 INFO SparkContext: Starting job: print at 
> JavaDirectKafkaWordCount.java:47
> 17/04/05 22:43:10 INFO DAGScheduler: Got job 0 (print at 
> JavaDirectKafkaWordCount.java:47) with 1 output partitions
> 17/04/05 22:43:10 INFO DAGScheduler: Final stage: ResultStage 0 (print at 
> JavaDirectKafkaWordCount.java:47)
> 17/04/05 22:43:10 INFO DAGScheduler: Parents of final stage: List()
> 17/04/05 22:43:10 INFO DAGScheduler: Missing parents: List()
> 17/04/05 22:43:10 INFO DAGScheduler: Submitting ResultStage 0 (KafkaRDD[0] at 
> createDirectStream at JavaDirectKafkaWordCount.java:44), which has no missing 
> parents
> 17/04/05 22:43:10 INFO MemoryStore: Block broadcast_0 stored as values in 
> memory (estimated size 2.3 KB, free 366.3 MB)
> 17/04/05 22:43:10 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes 
> in memory (estimated size 1529.0 B, free 366.3 MB)
> 17/04/05 22:43:10 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory 
> on 10.245.226.155:15258 (size: 1529.0 B, free: 366.3 MB)
> 17/04/05 22:43:10 INFO SparkContext: Created broadcast 0 from broadcast at 
> DAGScheduler.scala:996
> 17/04/05 22:43:10 INFO DAGScheduler: Submitting 1 missing tasks from 
> ResultStage 0 (KafkaRDD[0] at createDirectStream at 
> JavaDirectKafkaWordCount.java:44)
> 17/04/05 22:43:10 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
> 17/04/05 22:43:10 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: 
> Registered executor NettyRpcEndpointRef(null) (10.245.226.155:53448) with ID 0
> 17/04/05 22:43:10 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
> 10.245.226.155, executor 0, partition 0, PROCESS_LOCAL, 7295 bytes)
> 17/04/05 22:43:10 INFO BlockManagerMasterEndpoint: Registering block manager 
> 10.245.226.155:14669 with 366.3 MB RAM, BlockManagerId(0, 10.245.226.155, 
> 14669, None)
> 17/04/05 22:43:10 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: 
> Registered executor NettyRpcEndpointRef(null) (10.245.226.155:53447) with ID 1
> 17/04/05 22:43:10 INFO BlockManagerMasterEndpoint: Registering block manager 
> 10.245.226.155:33754 with 366.3 MB RAM, BlockManagerId(1, 10.245.226.155, 
> 33754, None)
> 17/04/05 22:43:11 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 
> 10.245.226.155, executor 0): java.lang.NullPointerException
>         at org.apache.spark.util.Utils$.decodeFileNameInURI(Utils.scala:409)
>         at org.apache.spark.util.Utils$.fetchFile(Utils.scala:434)
>         at 
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:508)
>         at 
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:500)
>         at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>         at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>         at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>         at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>         at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>         at 
> org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:500)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:257)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> {code}
> So is the example in 
> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
>  or is there something I could have done differently to get that example 
> working?
> and how I can debug spark jobs or logging of the jobs?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to