[ 
https://issues.apache.org/jira/browse/SPARK-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14729563#comment-14729563
 ] 

Randy Gelhausen commented on SPARK-10357:
-----------------------------------------

Regardless of what spark-csv package does, I'm using the DataFrame API itself 
to drop columns. SparkSQL's attempt to access those columns seems like a 
problem with SparkSQL's toDF method. Am I reading the API wrong? Can the docs 
maybe be updated to indicate that their function depends on the underlying read 
format?

val raw = sqlContext.read.format("com.databricks.spark.csv").option("header", 
"true").option("DROPMALFORMED", "true").load(input)
val columns = raw.columns.map(x => x.replaceAll(" ", "_"))
raw.toDF(columns:_*).registerTempTable(table)
val clean = sqlContext.sql("select " + columns.filter(x => x.length() > 0 && x 
!= " ").mkString(", ") + " from " + table) 

> DataFrames unable to drop unwanted columns
> ------------------------------------------
>
>                 Key: SPARK-10357
>                 URL: https://issues.apache.org/jira/browse/SPARK-10357
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.4.1
>            Reporter: Randy Gelhausen
>
> spark-csv seems to be exposing an issue with DataFrame's inability to drop 
> unwanted columns.
> Related GitHub issue: https://github.com/databricks/spark-csv/issues/61
> My data (with header) looks like:
> MI_PRINX,offense_id,rpt_date,occur_date,occur_time,poss_date,poss_time,beat,apt_office_prefix,apt_office_num,location,MinOfucr,MinOfibr_code,dispo_code,MaxOfnum_victims,Shift,Avg
>  Day,loc_type,UC2 Literal,neighborhood,npu,x,y,,,
> 934782,90360664,2/5/2009,2/3/2009,13:50:00,2/3/2009,15:00:00,305,NULL,NULL,55 
> MCDONOUGH BLVD SW,670,2308,NULL,1,Day,Tue,35,LARCENY-NON VEHICLE,South 
> Atlanta,Y,-84.38654,33.72024,,,
> 934783,90370891,2/6/2009,2/6/2009,8:50:00,2/6/2009,10:45:00,502,NULL,NULL,464 
> ANSLEY WALK TER NW,640,2305,NULL,1,Day,Fri,18,LARCENY-FROM VEHICLE,Ansley 
> Park,E,-84.37276,33.79685,,,
> Despite using sqlContext (also tried with the programmatic raw.select, same 
> result) to remove columns from the dataframe, attempts to operate on it cause 
> failures.
> Snippet:
>     // Read CSV file, clean field  names
>     val raw = 
> sqlContext.read.format("com.databricks.spark.csv").option("header", 
> "true").option("DROPMALFORMED", "true").load(input)
>     val columns = raw.columns.map(x => x.replaceAll(" ", "_"))
>     raw.toDF(columns:_*).registerTempTable(table)
>     val clean = sqlContext.sql("select " + columns.filter(x => x.length() > 0 
> && x != " ").mkString(", ") + " from " + table)
>     System.err.println(clean.schema)
>     System.err.println(clean.columns.mkString(","))
>     System.err.println(clean.take(1).mkString("|"))
> StackTrace:
> {code}
> 15/08/30 18:23:13 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 
> 0.0 (TID 0, docker.dev, NODE_LOCAL, 1482 bytes)
> 15/08/30 18:23:14 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in 
> memory on docker.dev:58272 (size: 1811.0 B, free: 530.0 MB)
> 15/08/30 18:23:14 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in 
> memory on docker.dev:58272 (size: 21.9 KB, free: 530.0 MB)
> 15/08/30 18:23:15 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 
> 0.0 (TID 0) in 1350 ms on docker.dev (1/1)
> 15/08/30 18:23:15 INFO scheduler.DAGScheduler: ResultStage 0 (take at 
> CsvRelation.scala:174) finished in 1.354 s
> 15/08/30 18:23:15 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose 
> tasks have all completed, from pool 
> 15/08/30 18:23:15 INFO scheduler.DAGScheduler: Job 0 finished: take at 
> CsvRelation.scala:174, took 1.413674 s
> StructType(StructField(MI_PRINX,StringType,true), 
> StructField(offense_id,StringType,true), 
> StructField(rpt_date,StringType,true), 
> StructField(occur_date,StringType,true), 
> StructField(occur_time,StringType,true), 
> StructField(poss_date,StringType,true), 
> StructField(poss_time,StringType,true), StructField(beat,StringType,true), 
> StructField(apt_office_prefix,StringType,true), 
> StructField(apt_office_num,StringType,true), 
> StructField(location,StringType,true), StructField(MinOfucr,StringType,true), 
> StructField(MinOfibr_code,StringType,true), 
> StructField(dispo_code,StringType,true), 
> StructField(MaxOfnum_victims,StringType,true), 
> StructField(Shift,StringType,true), StructField(Avg_Day,StringType,true), 
> StructField(loc_type,StringType,true), 
> StructField(UC2_Literal,StringType,true), 
> StructField(neighborhood,StringType,true), StructField(npu,StringType,true), 
> StructField(x,StringType,true), StructField(y,StringType,true))
> MI_PRINX,offense_id,rpt_date,occur_date,occur_time,poss_date,poss_time,beat,apt_office_prefix,apt_office_num,location,MinOfucr,MinOfibr_code,dispo_code,MaxOfnum_victims,Shift,Avg_Day,loc_type,UC2_Literal,neighborhood,npu,x,y
> 15/08/30 18:23:16 INFO storage.MemoryStore: ensureFreeSpace(232400) called 
> with curMem=259660, maxMem=278019440
> 15/08/30 18:23:16 INFO storage.MemoryStore: Block broadcast_2 stored as 
> values in memory (estimated size 227.0 KB, free 264.7 MB)
> 15/08/30 18:23:16 INFO storage.MemoryStore: ensureFreeSpace(22377) called 
> with curMem=492060, maxMem=278019440
> 15/08/30 18:23:16 INFO storage.MemoryStore: Block broadcast_2_piece0 stored 
> as bytes in memory (estimated size 21.9 KB, free 264.6 MB)
> 15/08/30 18:23:16 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in 
> memory on 172.17.0.19:41088 (size: 21.9 KB, free: 265.1 MB)
> 15/08/30 18:23:16 INFO spark.SparkContext: Created broadcast 2 from textFile 
> at TextFile.scala:30
> Exception in thread "main" java.lang.IllegalArgumentException: The header 
> contains a duplicate entry: '' in [MI_PRINX, offense_id, rpt_date, 
> occur_date, occur_time, poss_date, poss_time, beat, apt_office_prefix, 
> apt_office_num, location, MinOfucr, MinOfibr_code, dispo_code, 
> MaxOfnum_victims, Shift, Avg Day, loc_type, UC2 Literal, neighborhood, npu, 
> x, y, , , ]
>       at org.apache.commons.csv.CSVFormat.validate(CSVFormat.java:770)
>       at org.apache.commons.csv.CSVFormat.<init>(CSVFormat.java:364)
>       at org.apache.commons.csv.CSVFormat.withHeader(CSVFormat.java:882)
>       at com.databricks.spark.csv.CsvRelation.tokenRdd(CsvRelation.scala:84)
>       at com.databricks.spark.csv.CsvRelation.buildScan(CsvRelation.scala:105)
>       at 
> org.apache.spark.sql.sources.DataSourceStrategy$.apply(DataSourceStrategy.scala:101)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>       at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
>       at 
> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:300)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>       at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
>       at 
> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:314)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>       at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
>       at 
> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:943)
>       at 
> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:941)
>       at 
> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:947)
>       at 
> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:947)
>       at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1269)
>       at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1203)
>       at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1262)
>       at com.github.randerzander.CSVLoad$.main(CSVLoad.scala:29)
>       at com.github.randerzander.CSVLoad.main(CSVLoad.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
>       at 
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
>       at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
>       at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
>       at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to