Ryan, I just wanted to provide more info. Here is my .proto file which is the basis for generating the Person class. Thanks.
option java_package = "com.example.protos"; enum Gender { MALE = 1; FEMALE = 2; } message Address { optional string street = 1; optional string city = 2; } message Person { optional string name = 1; optional int32 age = 2; optional Gender gender = 3; repeated string tags = 4; repeated Address addresses = 5; } On Wed, Nov 16, 2016 at 3:04 PM, shyla deshpande <deshpandesh...@gmail.com> wrote: > *Thanks for the response. Following is the Person class..* > > // Generated by the Scala Plugin for the Protocol Buffer Compiler. > // Do not edit! > // > // Protofile syntax: PROTO2 > > package com.example.protos.demo > > > > @SerialVersionUID(0L) > final case class Person( > name: scala.Option[String] = None, > age: scala.Option[Int] = None, > gender: scala.Option[com.example.protos.demo.Gender] = None, > tags: scala.collection.Seq[String] = Nil, > addresses: scala.collection.Seq[com.example.protos.demo.Address] = Nil > ) extends com.trueaccord.scalapb.GeneratedMessage with > com.trueaccord.scalapb.Message[Person] with > com.trueaccord.lenses.Updatable[Person] { > @transient > private[this] var __serializedSizeCachedValue: Int = 0 > private[this] def __computeSerializedValue(): Int = { > var __size = 0 > if (name.isDefined) { __size += > com.google.protobuf.CodedOutputStream.computeStringSize(1, name.get) } > if (age.isDefined) { __size += > com.google.protobuf.CodedOutputStream.computeInt32Size(2, age.get) } > if (gender.isDefined) { __size += > com.google.protobuf.CodedOutputStream.computeEnumSize(3, gender.get.value) } > tags.foreach(tags => __size += > com.google.protobuf.CodedOutputStream.computeStringSize(4, tags)) > addresses.foreach(addresses => __size += 1 + > com.google.protobuf.CodedOutputStream.computeUInt32SizeNoTag(addresses.serializedSize) > + addresses.serializedSize) > __size > } > final override def serializedSize: Int = { > var read = __serializedSizeCachedValue > if (read == 0) { > read = __computeSerializedValue() > __serializedSizeCachedValue = read > } > read > } > def writeTo(`_output__`: com.google.protobuf.CodedOutputStream): Unit = { > name.foreach { __v => > _output__.writeString(1, __v) > }; > age.foreach { __v => > _output__.writeInt32(2, __v) > }; > gender.foreach { __v => > _output__.writeEnum(3, __v.value) > }; > tags.foreach { __v => > _output__.writeString(4, __v) > }; > addresses.foreach { __v => > _output__.writeTag(5, 2) > _output__.writeUInt32NoTag(__v.serializedSize) > __v.writeTo(_output__) > }; > } > def mergeFrom(`_input__`: com.google.protobuf.CodedInputStream): > com.example.protos.demo.Person = { > var __name = this.name > var __age = this.age > var __gender = this.gender > val __tags = (scala.collection.immutable.Vector.newBuilder[String] ++= > this.tags) > val __addresses = > (scala.collection.immutable.Vector.newBuilder[com.example.protos.demo.Address] > ++= this.addresses) > var _done__ = false > while (!_done__) { > val _tag__ = _input__.readTag() > _tag__ match { > case 0 => _done__ = true > case 10 => > __name = Some(_input__.readString()) > case 16 => > __age = Some(_input__.readInt32()) > case 24 => > __gender = > Some(com.example.protos.demo.Gender.fromValue(_input__.readEnum())) > case 34 => > __tags += _input__.readString() > case 42 => > __addresses += > com.trueaccord.scalapb.LiteParser.readMessage(_input__, > com.example.protos.demo.Address.defaultInstance) > case tag => _input__.skipField(tag) > } > } > com.example.protos.demo.Person( > name = __name, > age = __age, > gender = __gender, > tags = __tags.result(), > addresses = __addresses.result() > ) > } > def getName: String = name.getOrElse("") > def clearName: Person = copy(name = None) > def withName(__v: String): Person = copy(name = Some(__v)) > def getAge: Int = age.getOrElse(0) > def clearAge: Person = copy(age = None) > def withAge(__v: Int): Person = copy(age = Some(__v)) > def getGender: com.example.protos.demo.Gender = > gender.getOrElse(com.example.protos.demo.Gender.MALE) > def clearGender: Person = copy(gender = None) > def withGender(__v: com.example.protos.demo.Gender): Person = copy(gender > = Some(__v)) > def clearTags = copy(tags = scala.collection.Seq.empty) > def addTags(__vs: String*): Person = addAllTags(__vs) > def addAllTags(__vs: TraversableOnce[String]): Person = copy(tags = tags > ++ __vs) > def withTags(__v: scala.collection.Seq[String]): Person = copy(tags = __v) > def clearAddresses = copy(addresses = scala.collection.Seq.empty) > def addAddresses(__vs: com.example.protos.demo.Address*): Person = > addAllAddresses(__vs) > def addAllAddresses(__vs: > TraversableOnce[com.example.protos.demo.Address]): Person = copy(addresses = > addresses ++ __vs) > def withAddresses(__v: > scala.collection.Seq[com.example.protos.demo.Address]): Person = > copy(addresses = __v) > def getField(__field: com.google.protobuf.Descriptors.FieldDescriptor): > scala.Any = { > __field.getNumber match { > case 1 => name.getOrElse(null) > case 2 => age.getOrElse(null) > case 3 => gender.map(_.valueDescriptor).getOrElse(null) > case 4 => tags > case 5 => addresses > } > } > override def toString: String = > com.trueaccord.scalapb.TextFormat.printToUnicodeString(this) > def companion = com.example.protos.demo.Person > } > > object Person extends > com.trueaccord.scalapb.GeneratedMessageCompanion[com.example.protos.demo.Person] > { > implicit def messageCompanion: > com.trueaccord.scalapb.GeneratedMessageCompanion[com.example.protos.demo.Person] > = this > def fromFieldsMap(__fieldsMap: > scala.collection.immutable.Map[com.google.protobuf.Descriptors.FieldDescriptor, > scala.Any]): com.example.protos.demo.Person = { > require(__fieldsMap.keys.forall(_.getContainingType() == descriptor), > "FieldDescriptor does not match message type.") > val __fields = descriptor.getFields > com.example.protos.demo.Person( > __fieldsMap.get(__fields.get(0)).asInstanceOf[scala.Option[String]], > __fieldsMap.get(__fields.get(1)).asInstanceOf[scala.Option[Int]], > > __fieldsMap.get(__fields.get(2)).asInstanceOf[scala.Option[com.google.protobuf.Descriptors.EnumValueDescriptor]].map(__e > => com.example.protos.demo.Gender.fromValue(__e.getNumber)), > __fieldsMap.getOrElse(__fields.get(3), > Nil).asInstanceOf[scala.collection.Seq[String]], > __fieldsMap.getOrElse(__fields.get(4), > Nil).asInstanceOf[scala.collection.Seq[com.example.protos.demo.Address]] > ) > } > def descriptor: com.google.protobuf.Descriptors.Descriptor = > DemoProto.descriptor.getMessageTypes.get(1) > def messageCompanionForField(__field: > com.google.protobuf.Descriptors.FieldDescriptor): > com.trueaccord.scalapb.GeneratedMessageCompanion[_] = { > require(__field.getContainingType() == descriptor, "FieldDescriptor does > not match message type.") > var __out: com.trueaccord.scalapb.GeneratedMessageCompanion[_] = null > __field.getNumber match { > case 5 => __out = com.example.protos.demo.Address > } > __out > } > def enumCompanionForField(__field: > com.google.protobuf.Descriptors.FieldDescriptor): > com.trueaccord.scalapb.GeneratedEnumCompanion[_] = { > require(__field.getContainingType() == descriptor, "FieldDescriptor does > not match message type.") > __field.getNumber match { > case 3 => com.example.protos.demo.Gender > } > } > lazy val defaultInstance = com.example.protos.demo.Person( > ) > implicit class PersonLens[UpperPB](_l: com.trueaccord.lenses.Lens[UpperPB, > com.example.protos.demo.Person]) extends > com.trueaccord.lenses.ObjectLens[UpperPB, com.example.protos.demo.Person](_l) > { > def name: com.trueaccord.lenses.Lens[UpperPB, String] = > field(_.getName)((c_, f_) => c_.copy(name = Some(f_))) > def optionalName: com.trueaccord.lenses.Lens[UpperPB, > scala.Option[String]] = field(_.name)((c_, f_) => c_.copy(name = f_)) > def age: com.trueaccord.lenses.Lens[UpperPB, Int] = field(_.getAge)((c_, > f_) => c_.copy(age = Some(f_))) > def optionalAge: com.trueaccord.lenses.Lens[UpperPB, scala.Option[Int]] = > field(_.age)((c_, f_) => c_.copy(age = f_)) > def gender: com.trueaccord.lenses.Lens[UpperPB, > com.example.protos.demo.Gender] = field(_.getGender)((c_, f_) => > c_.copy(gender = Some(f_))) > def optionalGender: com.trueaccord.lenses.Lens[UpperPB, > scala.Option[com.example.protos.demo.Gender]] = field(_.gender)((c_, f_) => > c_.copy(gender = f_)) > def tags: com.trueaccord.lenses.Lens[UpperPB, > scala.collection.Seq[String]] = field(_.tags)((c_, f_) => c_.copy(tags = f_)) > def addresses: com.trueaccord.lenses.Lens[UpperPB, > scala.collection.Seq[com.example.protos.demo.Address]] = > field(_.addresses)((c_, f_) => c_.copy(addresses = f_)) > } > final val NAME_FIELD_NUMBER = 1 > final val AGE_FIELD_NUMBER = 2 > final val GENDER_FIELD_NUMBER = 3 > final val TAGS_FIELD_NUMBER = 4 > final val ADDRESSES_FIELD_NUMBER = 5 > } > > > On Wed, Nov 16, 2016 at 1:28 PM, Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > >> Could you provide the Person class? >> >> On Wed, Nov 16, 2016 at 1:19 PM, shyla deshpande < >> deshpandesh...@gmail.com> wrote: >> >>> I am using 2.11.8. Thanks >>> >>> On Wed, Nov 16, 2016 at 1:15 PM, Shixiong(Ryan) Zhu < >>> shixi...@databricks.com> wrote: >>> >>>> Which Scala version are you using? Is it Scala 2.10? Scala 2.10 has >>>> some known race conditions in reflection and the Scala community doesn't >>>> have plan to fix it (http://docs.scala-lang.org/ov >>>> erviews/reflection/thread-safety.html) AFAIK, the only way to fix it >>>> is upgrading to Scala 2.11. >>>> >>>> On Wed, Nov 16, 2016 at 11:16 AM, shyla deshpande < >>>> deshpandesh...@gmail.com> wrote: >>>> >>>>> I am using protobuf to encode. This may not be related to the new >>>>> release issue.... >>>>> >>>>> Exception in thread "main" scala.ScalaReflectionException: <none> is >>>>> not a term >>>>> at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199) >>>>> at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(S >>>>> ymbols.scala:84) >>>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.construc >>>>> tParams(ScalaReflection.scala:811) >>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.constructPara >>>>> ms(ScalaReflection.scala:39) >>>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.getConst >>>>> ructorParameters(ScalaReflection.scala:800) >>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructo >>>>> rParameters(ScalaReflection.scala:39) >>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp >>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect >>>>> ion.scala:582) >>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp >>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect >>>>> ion.scala:460) >>>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app >>>>> ly(ScalaReflection.scala:592) >>>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app >>>>> ly(ScalaReflection.scala:583) >>>>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr >>>>> aversableLike.scala:252) >>>>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr >>>>> aversableLike.scala:252) >>>>> at scala.collection.immutable.List.foreach(List.scala:381) >>>>> at scala.collection.TraversableLike$class.flatMap(TraversableLi >>>>> ke.scala:252) >>>>> at scala.collection.immutable.List.flatMap(List.scala:344) >>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp >>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect >>>>> ion.scala:583) >>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor >>>>> (ScalaReflection.scala:425) >>>>> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.ap >>>>> ply(ExpressionEncoder.scala:61) >>>>> at org.apache.spark.sql.Encoders$.product(Encoders.scala:274) >>>>> at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImpli >>>>> cits.scala:47) >>>>> at PersonConsumer$.main(PersonConsumer.scala:33) >>>>> at PersonConsumer.main(PersonConsumer.scala) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >>>>> ssorImpl.java:62) >>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >>>>> thodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.j >>>>> ava:147) >>>>> >>>>> The following is my code ... >>>>> >>>>> object PersonConsumer { >>>>> import org.apache.spark.rdd.RDD >>>>> import com.trueaccord.scalapb.spark._ >>>>> import org.apache.spark.sql.{SQLContext, SparkSession} >>>>> import com.example.protos.demo._ >>>>> >>>>> def main(args : Array[String]) { >>>>> >>>>> def parseLine(s: String): Person = >>>>> Person.parseFrom( >>>>> org.apache.commons.codec.binary.Base64.decodeBase64(s)) >>>>> >>>>> val spark = SparkSession.builder. >>>>> master("local") >>>>> .appName("spark session example") >>>>> .getOrCreate() >>>>> >>>>> import spark.implicits._ >>>>> >>>>> val ds1 = >>>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load() >>>>> >>>>> val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String] >>>>> >>>>> val ds3 = ds2.map(str => >>>>> parseLine(str)).createOrReplaceTempView("persons") >>>>> >>>>> val ds4 = spark.sqlContext.sql("select name from persons") >>>>> >>>>> val query = ds4.writeStream >>>>> .outputMode("append") >>>>> .format("console") >>>>> .start() >>>>> query.awaitTermination() >>>>> } >>>>> } >>>>> >>>>> >>>> >>> >> >