[
https://issues.apache.org/jira/browse/SPARK-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Michael Armbrust resolved SPARK-10357.
--------------------------------------
Resolution: Won't Fix
The exception is being thrown from within the CSV parsing library from an
external datasource. The data is never making into Spark SQL so there is
nothing that we can fix on our side.
> DataFrames unable to drop unwanted columns
> ------------------------------------------
>
> Key: SPARK-10357
> URL: https://issues.apache.org/jira/browse/SPARK-10357
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.4.1
> Reporter: Randy Gelhausen
>
> spark-csv seems to be exposing an issue with DataFrame's inability to drop
> unwanted columns.
> Related GitHub issue: https://github.com/databricks/spark-csv/issues/61
> My data (with header) looks like:
> MI_PRINX,offense_id,rpt_date,occur_date,occur_time,poss_date,poss_time,beat,apt_office_prefix,apt_office_num,location,MinOfucr,MinOfibr_code,dispo_code,MaxOfnum_victims,Shift,Avg
> Day,loc_type,UC2 Literal,neighborhood,npu,x,y,,,
> 934782,90360664,2/5/2009,2/3/2009,13:50:00,2/3/2009,15:00:00,305,NULL,NULL,55
> MCDONOUGH BLVD SW,670,2308,NULL,1,Day,Tue,35,LARCENY-NON VEHICLE,South
> Atlanta,Y,-84.38654,33.72024,,,
> 934783,90370891,2/6/2009,2/6/2009,8:50:00,2/6/2009,10:45:00,502,NULL,NULL,464
> ANSLEY WALK TER NW,640,2305,NULL,1,Day,Fri,18,LARCENY-FROM VEHICLE,Ansley
> Park,E,-84.37276,33.79685,,,
> Despite using sqlContext (also tried with the programmatic raw.select, same
> result) to remove columns from the dataframe, attempts to operate on it cause
> failures.
> Snippet:
> // Read CSV file, clean field names
> val raw =
> sqlContext.read.format("com.databricks.spark.csv").option("header",
> "true").option("DROPMALFORMED", "true").load(input)
> val columns = raw.columns.map(x => x.replaceAll(" ", "_"))
> raw.toDF(columns:_*).registerTempTable(table)
> val clean = sqlContext.sql("select " + columns.filter(x => x.length() > 0
> && x != " ").mkString(", ") + " from " + table)
> System.err.println(clean.schema)
> System.err.println(clean.columns.mkString(","))
> System.err.println(clean.take(1).mkString("|"))
> StackTrace:
> {code}
> 15/08/30 18:23:13 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
> 0.0 (TID 0, docker.dev, NODE_LOCAL, 1482 bytes)
> 15/08/30 18:23:14 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in
> memory on docker.dev:58272 (size: 1811.0 B, free: 530.0 MB)
> 15/08/30 18:23:14 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in
> memory on docker.dev:58272 (size: 21.9 KB, free: 530.0 MB)
> 15/08/30 18:23:15 INFO scheduler.TaskSetManager: Finished task 0.0 in stage
> 0.0 (TID 0) in 1350 ms on docker.dev (1/1)
> 15/08/30 18:23:15 INFO scheduler.DAGScheduler: ResultStage 0 (take at
> CsvRelation.scala:174) finished in 1.354 s
> 15/08/30 18:23:15 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose
> tasks have all completed, from pool
> 15/08/30 18:23:15 INFO scheduler.DAGScheduler: Job 0 finished: take at
> CsvRelation.scala:174, took 1.413674 s
> StructType(StructField(MI_PRINX,StringType,true),
> StructField(offense_id,StringType,true),
> StructField(rpt_date,StringType,true),
> StructField(occur_date,StringType,true),
> StructField(occur_time,StringType,true),
> StructField(poss_date,StringType,true),
> StructField(poss_time,StringType,true), StructField(beat,StringType,true),
> StructField(apt_office_prefix,StringType,true),
> StructField(apt_office_num,StringType,true),
> StructField(location,StringType,true), StructField(MinOfucr,StringType,true),
> StructField(MinOfibr_code,StringType,true),
> StructField(dispo_code,StringType,true),
> StructField(MaxOfnum_victims,StringType,true),
> StructField(Shift,StringType,true), StructField(Avg_Day,StringType,true),
> StructField(loc_type,StringType,true),
> StructField(UC2_Literal,StringType,true),
> StructField(neighborhood,StringType,true), StructField(npu,StringType,true),
> StructField(x,StringType,true), StructField(y,StringType,true))
> MI_PRINX,offense_id,rpt_date,occur_date,occur_time,poss_date,poss_time,beat,apt_office_prefix,apt_office_num,location,MinOfucr,MinOfibr_code,dispo_code,MaxOfnum_victims,Shift,Avg_Day,loc_type,UC2_Literal,neighborhood,npu,x,y
> 15/08/30 18:23:16 INFO storage.MemoryStore: ensureFreeSpace(232400) called
> with curMem=259660, maxMem=278019440
> 15/08/30 18:23:16 INFO storage.MemoryStore: Block broadcast_2 stored as
> values in memory (estimated size 227.0 KB, free 264.7 MB)
> 15/08/30 18:23:16 INFO storage.MemoryStore: ensureFreeSpace(22377) called
> with curMem=492060, maxMem=278019440
> 15/08/30 18:23:16 INFO storage.MemoryStore: Block broadcast_2_piece0 stored
> as bytes in memory (estimated size 21.9 KB, free 264.6 MB)
> 15/08/30 18:23:16 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in
> memory on 172.17.0.19:41088 (size: 21.9 KB, free: 265.1 MB)
> 15/08/30 18:23:16 INFO spark.SparkContext: Created broadcast 2 from textFile
> at TextFile.scala:30
> Exception in thread "main" java.lang.IllegalArgumentException: The header
> contains a duplicate entry: '' in [MI_PRINX, offense_id, rpt_date,
> occur_date, occur_time, poss_date, poss_time, beat, apt_office_prefix,
> apt_office_num, location, MinOfucr, MinOfibr_code, dispo_code,
> MaxOfnum_victims, Shift, Avg Day, loc_type, UC2 Literal, neighborhood, npu,
> x, y, , , ]
> at org.apache.commons.csv.CSVFormat.validate(CSVFormat.java:770)
> at org.apache.commons.csv.CSVFormat.<init>(CSVFormat.java:364)
> at org.apache.commons.csv.CSVFormat.withHeader(CSVFormat.java:882)
> at com.databricks.spark.csv.CsvRelation.tokenRdd(CsvRelation.scala:84)
> at com.databricks.spark.csv.CsvRelation.buildScan(CsvRelation.scala:105)
> at
> org.apache.spark.sql.sources.DataSourceStrategy$.apply(DataSourceStrategy.scala:101)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
> at
> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:300)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
> at
> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:314)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:943)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:941)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:947)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:947)
> at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1269)
> at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1203)
> at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1262)
> at com.github.randerzander.CSVLoad$.main(CSVLoad.scala:29)
> at com.github.randerzander.CSVLoad.main(CSVLoad.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
> at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]