Jyoti Biswas created ZEPPELIN-2746:
--------------------------------------

             Summary: Scala Spark-Streaming with Kafka Integration program does 
not show output
                 Key: ZEPPELIN-2746
                 URL: https://issues.apache.org/jira/browse/ZEPPELIN-2746
             Project: Zeppelin
          Issue Type: Bug
          Components: Interpreters
    Affects Versions: 0.7.0
         Environment: Config Details: Ambari managed HDP 2.6 on a 4-node 
cluster of Spark 2, Kafka, and Hbase
Zeppelin Interpreter dependencies for Spark 2: 
org.apache.spark:spark-streaming-kafka-0-0_2.11:2.1.1, 
org.apache.kafka:kafka-clients:0.10.1.1, org.apache.kafka:kafka_2.11:0.10.1.1
            Reporter: Jyoti Biswas
             Fix For: 0.7.0


I have created 8 messages using the Kafka console producer, such that when I 
execute the console consumer

./kafka-console-consumer.sh --bootstrap-server 
vrxhdpkfknod.eastus.cloudapp.azure.com:6667 --topic spark-streaming 
--from-beginning
I get 8 messages displayed

^CProcessed a total of 8 messages 
When I execute the spark 2 code in Zeppelin,


{code:java}
%spark2
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming._
sc.setLogLevel("ERROR")  // prevent INFO logging from polluting output
val ssc =  StreamingContext.getActiveOrCreate(() => new StreamingContext(sc, 
Seconds(5)))    // creating the StreamingContext with 5 seconds interval
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "vrxhdpkfknod.eastus.cloudapp.azure.com:6667",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "kafka-streaming-example",
  "auto.offset.reset" -> "earliest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("spark-streaming")
val messages = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)
messages.foreachRDD { rdd =>
      System.out.println("--- New RDD with " + rdd.partitions.size + " 
partitions and " + rdd.count + " records")
      rdd.foreach { record =>
        System.out.print(record.value())
      }
    }
ssc.start()
{code}

I get


{code:java}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._ import 
org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming._

ssc: org.apache.spark.streaming.StreamingContext = 
org.apache.spark.streaming.StreamingContext@377213ce
kafkaParams: scala.collection.immutable.Map[String,Object] = 
Map(key.deserializer -> class 
org.apache.kafka.common.serialization.StringDeserializer, auto.offset.reset -> 
earliest, group.id -> kafka-streaming-example, bootstrap.servers -> 
vrxhdpkfknod.eastus.cloudapp.azure.com:6667, enable.auto.commit -> false, 
value.deserializer -> class 
org.apache.kafka.common.serialization.StringDeserializer)
topics: Array[String] = Array(spark-streaming)
messages: 
org.apache.spark.streaming.dstream.InputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]]
 = org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@328d6f9a
{code}

There are no error messages. But there is no display of the Scala output. The 
same code when run in Spark Shell works just fine.









--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to