Jyoti Biswas created ZEPPELIN-2746:
--------------------------------------
Summary: Scala Spark-Streaming with Kafka Integration program does
not show output
Key: ZEPPELIN-2746
URL: https://issues.apache.org/jira/browse/ZEPPELIN-2746
Project: Zeppelin
Issue Type: Bug
Components: Interpreters
Affects Versions: 0.7.0
Environment: Config Details: Ambari managed HDP 2.6 on a 4-node
cluster of Spark 2, Kafka, and Hbase
Zeppelin Interpreter dependencies for Spark 2:
org.apache.spark:spark-streaming-kafka-0-0_2.11:2.1.1,
org.apache.kafka:kafka-clients:0.10.1.1, org.apache.kafka:kafka_2.11:0.10.1.1
Reporter: Jyoti Biswas
Fix For: 0.7.0
I have created 8 messages using the Kafka console producer, such that when I
execute the console consumer
./kafka-console-consumer.sh --bootstrap-server
vrxhdpkfknod.eastus.cloudapp.azure.com:6667 --topic spark-streaming
--from-beginning
I get 8 messages displayed
^CProcessed a total of 8 messages
When I execute the spark 2 code in Zeppelin,
{code:java}
%spark2
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming._
sc.setLogLevel("ERROR") // prevent INFO logging from polluting output
val ssc = StreamingContext.getActiveOrCreate(() => new StreamingContext(sc,
Seconds(5))) // creating the StreamingContext with 5 seconds interval
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "vrxhdpkfknod.eastus.cloudapp.azure.com:6667",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "kafka-streaming-example",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("spark-streaming")
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
messages.foreachRDD { rdd =>
System.out.println("--- New RDD with " + rdd.partitions.size + "
partitions and " + rdd.count + " records")
rdd.foreach { record =>
System.out.print(record.value())
}
}
ssc.start()
{code}
I get
{code:java}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._ import
org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming._
ssc: org.apache.spark.streaming.StreamingContext =
org.apache.spark.streaming.StreamingContext@377213ce
kafkaParams: scala.collection.immutable.Map[String,Object] =
Map(key.deserializer -> class
org.apache.kafka.common.serialization.StringDeserializer, auto.offset.reset ->
earliest, group.id -> kafka-streaming-example, bootstrap.servers ->
vrxhdpkfknod.eastus.cloudapp.azure.com:6667, enable.auto.commit -> false,
value.deserializer -> class
org.apache.kafka.common.serialization.StringDeserializer)
topics: Array[String] = Array(spark-streaming)
messages:
org.apache.spark.streaming.dstream.InputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]]
= org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@328d6f9a
{code}
There are no error messages. But there is no display of the Scala output. The
same code when run in Spark Shell works just fine.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)