VIVEK NANDLAL MISTRY MISTRY created ZEPPELIN-4888:
-----------------------------------------------------

             Summary: Caused by: org.apache.spark.SparkException: Writing job 
aborted.
                 Key: ZEPPELIN-4888
                 URL: https://issues.apache.org/jira/browse/ZEPPELIN-4888
             Project: Zeppelin
          Issue Type: Bug
          Components: spark
    Affects Versions: 0.9.0
            Reporter: VIVEK NANDLAL MISTRY MISTRY
             Fix For: 0.9.0


We have a Kubernetes Cluster for Spark(2.4.6) and Zeppelin(0.9.0). While 
running below script with dependency 
(org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.6) get following error. 

Had tried running this script with Spark version(2.4.3) and Zeppelin(0.8.1) but 
got the same issue. While running this script on spark interactive shell, it 
works perfectly fine.

Please advice, what pending or missing configuration in Zeppelin?

*************************Script*******************************

%spark
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._
import java.nio.file.\{Paths, Files}
import java.nio.charset.StandardCharsets


val jaas2 = "org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"XLSYWHP3RVZPUDDH\" 
password=\"64wrFdqmCsVDpJYmID3fNy5SmnNr4IyzsGASe96M0atgRQHNoCLPcSO+4ozGJCFE\";"
val wschema = new StructType().add("epic", "string").add("ofrClose", 
"float").add("ofrOpen", "float")

val ds = spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
"pkc-4ygn6.europe-west3.gcp.confluent.cloud:9092").option("kafka.sasl.jaas.config",jaas2).option("kafka.security.protocol","SASL_SSL").option("kafka.ssl.endpoint.identification.algorithm","https").option("kafka.sasl.mechanism","PLAIN").option("startingOffsets",
 "earliest").option("subscribe", "raw-data-feed").load()

val ds1 = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)").as[(String, String)]

val structuredDS = ds1.select(from_json(col("value"), 
wschema).as("payload")).select("payload.*")

println("WriteStream")
val query = structuredDS.writeStream.format("console").option("truncate", 
"false").start()

 

 

 

************************************Error***********************************

 

 

%spark
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._
import java.nio.file.\{Paths, Files}
import java.nio.charset.StandardCharsets


val jaas2 = "org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"XLSYWHP3RVZPUDDH\" 
password=\"64wrFdqmCsVDpJYmID3fNy5SmnNr4IyzsGASe96M0atgRQHNoCLPcSO+4ozGJCFE\";"
val wschema = new StructType().add("epic", "string").add("ofrClose", 
"float").add("ofrOpen", "float")

val ds = spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
"pkc-4ygn6.europe-west3.gcp.confluent.cloud:9092").option("kafka.sasl.jaas.config",jaas2).option("kafka.security.protocol","SASL_SSL").option("kafka.ssl.endpoint.identification.algorithm","https").option("kafka.sasl.mechanism","PLAIN").option("startingOffsets",
 "earliest").option("subscribe", "raw-data-feed").load()

val ds1 = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)").as[(String, String)]

val structuredDS = ds1.select(from_json(col("value"), 
wschema).as("payload")).select("payload.*")

println("WriteStream")
val query = structuredDS.writeStream.format("console").option("truncate", 
"false").start()

println("Start")
query.awaitTermination()

println("Start")
query.awaitTermination()



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to