I am using 2.11.8. Thanks On Wed, Nov 16, 2016 at 1:15 PM, Shixiong(Ryan) Zhu <shixi...@databricks.com > wrote:
> Which Scala version are you using? Is it Scala 2.10? Scala 2.10 has some > known race conditions in reflection and the Scala community doesn't have > plan to fix it (http://docs.scala-lang.org/overviews/reflection/thread- > safety.html) AFAIK, the only way to fix it is upgrading to Scala 2.11. > > On Wed, Nov 16, 2016 at 11:16 AM, shyla deshpande < > deshpandesh...@gmail.com> wrote: > >> I am using protobuf to encode. This may not be related to the new release >> issue.... >> >> Exception in thread "main" scala.ScalaReflectionException: <none> is not >> a term >> at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199) >> at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm( >> Symbols.scala:84) >> at org.apache.spark.sql.catalyst.ScalaReflection$class.construc >> tParams(ScalaReflection.scala:811) >> at org.apache.spark.sql.catalyst.ScalaReflection$.constructPara >> ms(ScalaReflection.scala:39) >> at org.apache.spark.sql.catalyst.ScalaReflection$class.getConst >> ructorParameters(ScalaReflection.scala:800) >> at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructo >> rParameters(ScalaReflection.scala:39) >> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp >> ark$sql$catalyst$ScalaReflection$$serializerFor( >> ScalaReflection.scala:582) >> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp >> ark$sql$catalyst$ScalaReflection$$serializerFor( >> ScalaReflection.scala:460) >> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app >> ly(ScalaReflection.scala:592) >> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app >> ly(ScalaReflection.scala:583) >> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr >> aversableLike.scala:252) >> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr >> aversableLike.scala:252) >> at scala.collection.immutable.List.foreach(List.scala:381) >> at scala.collection.TraversableLike$class.flatMap(TraversableLi >> ke.scala:252) >> at scala.collection.immutable.List.flatMap(List.scala:344) >> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp >> ark$sql$catalyst$ScalaReflection$$serializerFor( >> ScalaReflection.scala:583) >> at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor >> (ScalaReflection.scala:425) >> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.ap >> ply(ExpressionEncoder.scala:61) >> at org.apache.spark.sql.Encoders$.product(Encoders.scala:274) >> at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImpli >> cits.scala:47) >> at PersonConsumer$.main(PersonConsumer.scala:33) >> at PersonConsumer.main(PersonConsumer.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >> ssorImpl.java:62) >> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >> thodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) >> >> The following is my code ... >> >> object PersonConsumer { >> import org.apache.spark.rdd.RDD >> import com.trueaccord.scalapb.spark._ >> import org.apache.spark.sql.{SQLContext, SparkSession} >> import com.example.protos.demo._ >> >> def main(args : Array[String]) { >> >> def parseLine(s: String): Person = >> Person.parseFrom( >> org.apache.commons.codec.binary.Base64.decodeBase64(s)) >> >> val spark = SparkSession.builder. >> master("local") >> .appName("spark session example") >> .getOrCreate() >> >> import spark.implicits._ >> >> val ds1 = >> spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load() >> >> val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String] >> >> val ds3 = ds2.map(str => >> parseLine(str)).createOrReplaceTempView("persons") >> >> val ds4 = spark.sqlContext.sql("select name from persons") >> >> val query = ds4.writeStream >> .outputMode("append") >> .format("console") >> .start() >> query.awaitTermination() >> } >> } >> >> >