I am using 2.11.8. Thanks

On Wed, Nov 16, 2016 at 1:15 PM, Shixiong(Ryan) Zhu <shixi...@databricks.com
> wrote:

> Which Scala version are you using? Is it Scala 2.10? Scala 2.10 has some
> known race conditions in reflection and the Scala community doesn't have
> plan to fix it (http://docs.scala-lang.org/overviews/reflection/thread-
> safety.html) AFAIK, the only way to fix it is upgrading to Scala 2.11.
>
> On Wed, Nov 16, 2016 at 11:16 AM, shyla deshpande <
> deshpandesh...@gmail.com> wrote:
>
>> I am using protobuf to encode. This may not be related to the new release
>> issue....
>>
>> Exception in thread "main" scala.ScalaReflectionException: <none> is not
>> a term
>> at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199)
>> at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(
>> Symbols.scala:84)
>> at org.apache.spark.sql.catalyst.ScalaReflection$class.construc
>> tParams(ScalaReflection.scala:811)
>> at org.apache.spark.sql.catalyst.ScalaReflection$.constructPara
>> ms(ScalaReflection.scala:39)
>> at org.apache.spark.sql.catalyst.ScalaReflection$class.getConst
>> ructorParameters(ScalaReflection.scala:800)
>> at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructo
>> rParameters(ScalaReflection.scala:39)
>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>> ark$sql$catalyst$ScalaReflection$$serializerFor(
>> ScalaReflection.scala:582)
>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>> ark$sql$catalyst$ScalaReflection$$serializerFor(
>> ScalaReflection.scala:460)
>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>> ly(ScalaReflection.scala:592)
>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>> ly(ScalaReflection.scala:583)
>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>> aversableLike.scala:252)
>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>> aversableLike.scala:252)
>> at scala.collection.immutable.List.foreach(List.scala:381)
>> at scala.collection.TraversableLike$class.flatMap(TraversableLi
>> ke.scala:252)
>> at scala.collection.immutable.List.flatMap(List.scala:344)
>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>> ark$sql$catalyst$ScalaReflection$$serializerFor(
>> ScalaReflection.scala:583)
>> at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor
>> (ScalaReflection.scala:425)
>> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.ap
>> ply(ExpressionEncoder.scala:61)
>> at org.apache.spark.sql.Encoders$.product(Encoders.scala:274)
>> at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImpli
>> cits.scala:47)
>> at PersonConsumer$.main(PersonConsumer.scala:33)
>> at PersonConsumer.main(PersonConsumer.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>>
>> The following is my code ...
>>
>> object PersonConsumer {
>>   import org.apache.spark.rdd.RDD
>>   import com.trueaccord.scalapb.spark._
>>   import org.apache.spark.sql.{SQLContext, SparkSession}
>>   import com.example.protos.demo._
>>
>>   def main(args : Array[String]) {
>>
>>     def parseLine(s: String): Person =
>>       Person.parseFrom(
>>         org.apache.commons.codec.binary.Base64.decodeBase64(s))
>>
>>     val spark = SparkSession.builder.
>>       master("local")
>>       .appName("spark session example")
>>       .getOrCreate()
>>
>>     import spark.implicits._
>>
>>     val ds1 = 
>> spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()
>>
>>     val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]
>>
>>     val ds3 = ds2.map(str => 
>> parseLine(str)).createOrReplaceTempView("persons")
>>
>>     val ds4 = spark.sqlContext.sql("select name from persons")
>>
>>     val query = ds4.writeStream
>>       .outputMode("append")
>>       .format("console")
>>       .start()
>>     query.awaitTermination()
>>   }
>> }
>>
>>
>

Reply via email to