Have you tried the native CSV reader (in spark 2) or the Databricks CSV reader (in 1.6).
If your format is in a CSV like format it'll load it directly into a DataFrame. Its possible you have some rows where types are inconsistent. Nicholas Szandor Hakobian, Ph.D. Senior Data Scientist Rally Health nicholas.hakob...@rallyhealth.com On Thu, Jan 12, 2017 at 1:52 AM, lk_spark <lk_sp...@163.com> wrote: > I have try like this: > > val peopleRDD = spark.sparkContext.textFile("/ > sourcedata/test/test*") > val rowRDD = peopleRDD.map(_.split(",")).map(attributes => { > val ab = ArrayBuffer[Any]() > for (i <- 0 until schemaType.length) { > if (schemaType(i).equalsIgnoreCase("int")) { > ab += attributes(i).toInt > } else if (schemaType(i).equalsIgnoreCase("long")) { > ab += attributes(i).toLong > } else { > ab += attributes(i) > } > } > Row(ab.toArray) > }) > > val peopleDF = spark.createDataFrame(rowRDD, schema) > peopleDF .show > > I got error: > Caused by: java.lang.RuntimeException: [Ljava.lang.Object; is not a > valid external type for schema of string > at org.apache.spark.sql.catalyst.expressions.GeneratedClass$ > SpecificUnsafeProjection.apply_0$(Unknown Source) > at org.apache.spark.sql.catalyst.expressions.GeneratedClass$ > SpecificUnsafeProjection.apply(Unknown Source) > at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder. > toRow(ExpressionEncoder.scala:290) > all the file was Any, what should I do? > > > > 2017-01-12 > ------------------------------ > lk_spark > ------------------------------ > > *发件人:*"lk_spark"<lk_sp...@163.com> > *发送时间:*2017-01-12 14:38 > *主题:*Re: Re: how to change datatype by useing StructType > *收件人:*"ayan guha"<guha.a...@gmail.com>,"user.spark"<user@spark.apache.org> > *抄送:* > > yes, field year is in my data: > > data: > kevin,30,2016 > shen,30,2016 > kai,33,2016 > wei,30,2016 > > this will not work > val rowRDD = peopleRDD.map(_.split(",")).map(attributes => > Row(attributes(0),attributes(1),attributes(2))) > but I need read data by configurable. > 2017-01-12 > ------------------------------ > lk_spark > ------------------------------ > > *发件人:*ayan guha <guha.a...@gmail.com> > *发送时间:*2017-01-12 14:34 > *主题:*Re: how to change datatype by useing StructType > *收件人:*"lk_spark"<lk_sp...@163.com>,"user.spark"<user@spark.apache.org> > *抄送:* > > Do you have year in your data? > On Thu, 12 Jan 2017 at 5:24 pm, lk_spark <lk_sp...@163.com> wrote: > >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> hi,all >> >> >> I have a txt file ,and I want to process it as dataframe >> >> : >> >> >> >> >> >> data like this : >> >> >> name1,30 >> >> >> name2,18 >> >> >> >> >> >> val schemaString = "name age year" >> >> >> val xMap=new >> >> scala.collection.mutable.HashMap[String,DataType]() >> >> >> xMap.put("name", StringType) >> xMap.put("age", >> >> IntegerType) >> xMap.put("year", >> >> IntegerType) >> >> val fields = >> >> schemaString.split(" ").map(fieldName => StructField(fieldName, >> >> xMap.get(fieldName).get, nullable = true)) >> val schema = >> >> StructType(fields) >> >> val peopleRDD = >> >> spark.sparkContext.textFile("/sourcedata/test/test*") >> >> >> //spark.read.schema(schema).text("/sourcedata/test/test*") >> >> >> >> val rowRDD = peopleRDD.map(_.split(",")).map(attributes >> >> => Row(attributes(0),attributes(1)) >> >> >> >> >> >> // Apply the schema to the RDD >> val >> >> peopleDF = spark.createDataFrame(rowRDD, schema) >> >> >> >> >> >> but when I write it to table or show it I will got >> >> error: >> >> >> >> >> >> >> >> >> Caused by: java.lang.RuntimeException: Error while encoding: >> >> java.lang.RuntimeException: java.lang.String is not a valid external type >> for >> >> schema of int >> if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top >> >> level row object).isNullAt) null else staticinvoke(class >> >> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, >> >> validateexternaltype(getexternalrowfield(assertnotnull(input[0, >> >> org.apache.spark.sql.Row, true], top level row object), 0, name), >> StringType), >> >> true) AS name#1 >> +- if (assertnotnull(input[0, org.apache.spark.sql.Row, >> >> true], top level row object).isNullAt) null else staticinvoke(class >> >> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, >> >> validateexternaltype(getexternalrowfield(assertnotnull(input[0, >> >> org.apache.spark.sql.Row, true], top level row object), 0, name), >> StringType), >> >> true) >> >> >> >> >> >> if I change my code it will work: >> >> >> val rowRDD = peopleRDD.map(_.split(",")).map(attributes => >> >> Row(attributes(0),attributes(1).toInt) >> >> >> but this is not a good idea . >> >> >> >> >> 2017-01-12 >> >> >> ------------------------------ >> >> >> lk_spark >> >