VIVEK NANDLAL MISTRY MISTRY created ZEPPELIN-4888:
-----------------------------------------------------
Summary: Caused by: org.apache.spark.SparkException: Writing job
aborted.
Key: ZEPPELIN-4888
URL: https://issues.apache.org/jira/browse/ZEPPELIN-4888
Project: Zeppelin
Issue Type: Bug
Components: spark
Affects Versions: 0.9.0
Reporter: VIVEK NANDLAL MISTRY MISTRY
Fix For: 0.9.0
We have a Kubernetes Cluster for Spark(2.4.6) and Zeppelin(0.9.0). While
running below script with dependency
(org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.6) get following error.
Had tried running this script with Spark version(2.4.3) and Zeppelin(0.8.1) but
got the same issue. While running this script on spark interactive shell, it
works perfectly fine.
Please advice, what pending or missing configuration in Zeppelin?
*************************Script*******************************
%spark
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._
import java.nio.file.\{Paths, Files}
import java.nio.charset.StandardCharsets
val jaas2 = "org.apache.kafka.common.security.plain.PlainLoginModule required
username=\"XLSYWHP3RVZPUDDH\"
password=\"64wrFdqmCsVDpJYmID3fNy5SmnNr4IyzsGASe96M0atgRQHNoCLPcSO+4ozGJCFE\";"
val wschema = new StructType().add("epic", "string").add("ofrClose",
"float").add("ofrOpen", "float")
val ds = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"pkc-4ygn6.europe-west3.gcp.confluent.cloud:9092").option("kafka.sasl.jaas.config",jaas2).option("kafka.security.protocol","SASL_SSL").option("kafka.ssl.endpoint.identification.algorithm","https").option("kafka.sasl.mechanism","PLAIN").option("startingOffsets",
"earliest").option("subscribe", "raw-data-feed").load()
val ds1 = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS
STRING)").as[(String, String)]
val structuredDS = ds1.select(from_json(col("value"),
wschema).as("payload")).select("payload.*")
println("WriteStream")
val query = structuredDS.writeStream.format("console").option("truncate",
"false").start()
************************************Error***********************************
%spark
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._
import java.nio.file.\{Paths, Files}
import java.nio.charset.StandardCharsets
val jaas2 = "org.apache.kafka.common.security.plain.PlainLoginModule required
username=\"XLSYWHP3RVZPUDDH\"
password=\"64wrFdqmCsVDpJYmID3fNy5SmnNr4IyzsGASe96M0atgRQHNoCLPcSO+4ozGJCFE\";"
val wschema = new StructType().add("epic", "string").add("ofrClose",
"float").add("ofrOpen", "float")
val ds = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"pkc-4ygn6.europe-west3.gcp.confluent.cloud:9092").option("kafka.sasl.jaas.config",jaas2).option("kafka.security.protocol","SASL_SSL").option("kafka.ssl.endpoint.identification.algorithm","https").option("kafka.sasl.mechanism","PLAIN").option("startingOffsets",
"earliest").option("subscribe", "raw-data-feed").load()
val ds1 = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS
STRING)").as[(String, String)]
val structuredDS = ds1.select(from_json(col("value"),
wschema).as("payload")).select("payload.*")
println("WriteStream")
val query = structuredDS.writeStream.format("console").option("truncate",
"false").start()
println("Start")
query.awaitTermination()
println("Start")
query.awaitTermination()
--
This message was sent by Atlassian Jira
(v8.3.4#803005)