Have you tried the native CSV reader (in spark 2) or the Databricks CSV
reader (in 1.6).

If your format is in a CSV like format it'll load it directly into a
DataFrame. Its possible you have some rows where types are inconsistent.

Nicholas Szandor Hakobian, Ph.D.
Senior Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com


On Thu, Jan 12, 2017 at 1:52 AM, lk_spark <lk_sp...@163.com> wrote:

> I have try like this:
>
>       val peopleRDD = spark.sparkContext.textFile("/
> sourcedata/test/test*")
>       val rowRDD = peopleRDD.map(_.split(",")).map(attributes => {
>       val ab = ArrayBuffer[Any]()
>       for (i <- 0 until schemaType.length) {
>         if (schemaType(i).equalsIgnoreCase("int")) {
>           ab += attributes(i).toInt
>         } else if (schemaType(i).equalsIgnoreCase("long")) {
>           ab += attributes(i).toLong
>         } else {
>           ab += attributes(i)
>         }
>       }
>       Row(ab.toArray)
>     })
>
>         val peopleDF = spark.createDataFrame(rowRDD, schema)
> peopleDF .show
>
> I got error:
>      Caused by: java.lang.RuntimeException: [Ljava.lang.Object; is not a
> valid external type for schema of string
>   at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> SpecificUnsafeProjection.apply_0$(Unknown Source)
>   at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> SpecificUnsafeProjection.apply(Unknown Source)
>   at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.
> toRow(ExpressionEncoder.scala:290)
> all the file was Any, what should I do?
>
>
>
> 2017-01-12
> ------------------------------
> lk_spark
> ------------------------------
>
> *发件人:*"lk_spark"<lk_sp...@163.com>
> *发送时间:*2017-01-12 14:38
> *主题:*Re: Re: how to change datatype by useing StructType
> *收件人:*"ayan guha"<guha.a...@gmail.com>,"user.spark"<user@spark.apache.org>
> *抄送:*
>
> yes, field year is in my data:
>
> data:
>   kevin,30,2016
>   shen,30,2016
>   kai,33,2016
>   wei,30,2016
>
> this will not work
>    val rowRDD = peopleRDD.map(_.split(",")).map(attributes =>
> Row(attributes(0),attributes(1),attributes(2)))
> but I need read data by configurable.
> 2017-01-12
> ------------------------------
> lk_spark
> ------------------------------
>
> *发件人:*ayan guha <guha.a...@gmail.com>
> *发送时间:*2017-01-12 14:34
> *主题:*Re: how to change datatype by useing StructType
> *收件人:*"lk_spark"<lk_sp...@163.com>,"user.spark"<user@spark.apache.org>
> *抄送:*
>
> Do you have year in your data?
> On Thu, 12 Jan 2017 at 5:24 pm, lk_spark <lk_sp...@163.com> wrote:
>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> hi,all
>>
>>
>>     I have a txt file ,and I want to process it as dataframe
>>
>> :
>>
>>
>>
>>
>>
>>     data like this :
>>
>>
>>        name1,30
>>
>>
>>        name2,18
>>
>>
>>
>>
>>
>>     val schemaString = "name age year"
>>
>>
>> val xMap=new
>>
>> scala.collection.mutable.HashMap[String,DataType]()
>>
>>
>> xMap.put("name", StringType)
>>     xMap.put("age",
>>
>> IntegerType)
>>     xMap.put("year",
>>
>> IntegerType)
>>
>>     val fields =
>>
>> schemaString.split(" ").map(fieldName => StructField(fieldName,
>>
>> xMap.get(fieldName).get, nullable = true))
>>     val schema =
>>
>> StructType(fields)
>>
>>     val peopleRDD =
>>
>> spark.sparkContext.textFile("/sourcedata/test/test*")
>>
>>
>> //spark.read.schema(schema).text("/sourcedata/test/test*")
>>
>>
>>
>>     val rowRDD = peopleRDD.map(_.split(",")).map(attributes
>>
>> => Row(attributes(0),attributes(1))
>>
>>
>>
>>
>>
>>     // Apply the schema to the RDD
>>     val
>>
>> peopleDF = spark.createDataFrame(rowRDD, schema)
>>
>>
>>
>>
>>
>>     but when I write it to table or show it I will got
>>
>> error:
>>
>>
>>
>>
>>
>>
>>
>>
>>    Caused by: java.lang.RuntimeException: Error while encoding:
>>
>> java.lang.RuntimeException: java.lang.String is not a valid external type
>> for
>>
>> schema of int
>> if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top
>>
>> level row object).isNullAt) null else staticinvoke(class
>>
>> org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
>>
>> validateexternaltype(getexternalrowfield(assertnotnull(input[0,
>>
>> org.apache.spark.sql.Row, true], top level row object), 0, name),
>> StringType),
>>
>> true) AS name#1
>> +- if (assertnotnull(input[0, org.apache.spark.sql.Row,
>>
>> true], top level row object).isNullAt) null else staticinvoke(class
>>
>> org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
>>
>> validateexternaltype(getexternalrowfield(assertnotnull(input[0,
>>
>> org.apache.spark.sql.Row, true], top level row object), 0, name),
>> StringType),
>>
>> true)
>>
>>
>>
>>
>>
>>    if I change my code it will work:
>>
>>
>>    val rowRDD = peopleRDD.map(_.split(",")).map(attributes =>
>>
>> Row(attributes(0),attributes(1).toInt)
>>
>>
>>    but this is not a good idea .
>>
>>
>>
>>
>> 2017-01-12
>>
>>
>> ------------------------------
>>
>>
>> lk_spark
>>
>

Reply via email to