Ricky,
You may need to use map instead of flatMap in your case *val rowRDD=sc.textFile("/user/spark/short_model").map(_.split("\\t")).map(p => Row(...))* Thanks! -Terry On Fri, Aug 28, 2015 at 5:08 PM, our...@cnsuning.com <our...@cnsuning.com> wrote: > hi all, > > when using spark sql ,A problem bothering me. > > the codeing as following: > > *val schemaString = > "visitor_id,cust_num,gds_id,l1_gds_group_cd,l4_gds_group_cd,pc_gds_addcart,pc_gds_collect_num,pc_gds_four_page_pv,pc_gds_four_page_time,pc_gds_four_page_fromsearch_pv,pc_gds_four_page_fromlist_pv,pc_gds_four_page_fromrec_pv,pc_gds_four_page_fromcuxiao_pv,pc_four_page_num,pc_group_gds_addcart,pc_group_gds_collect,pc_group_fourpage_pv,pc_group_fourpage_time,pc_visitor_pv,pc_search_pv,pc_list_pv,pc_is_view1,pc_is_view,pc_view_cycle_days,pc_view_days,wap_gds_addcart,wap_gds_collect_num,wap_gds_four_page_pv,wap_gds_four_page_time,wap_gds_four_page_fromsearch_pv,wap_gds_four_page_fromlist_pv,wap_gds_four_page_fromrec_pv,wap_gds_four_page_fromcuxiao_pv,wap_four_page_num,wap_group_gds_addcart,wap_group_gds_collect,wap_group_fourpage_pv,wap_group_fourpage_time,wap_visitor_pv,wap_search_pv,wap_list_pv,wap_is_view1,wap_is_view,wap_view_cycle_days,wap_view_days,app_gds_addcart,app_gds_collect_num,app_gds_four_page_pv,app_gds_four_page_time,app_gds_four_page_fromsearch_pv,app_gds_four_page_fromlist_pv,app_gds_four_page_fromrec_pv,app_gds_four_page_fromcuxiao_pv,app_four_page_num,app_group_gds_addcart,app_group_gds_collect,app_group_fourpage_pv,app_group_fourpage_time,app_visitor_pv,app_search_pv,app_list_pv,app_is_view1,app_is_view,app_view_cycle_days,app_view_days,gds_score_desc,l4_gds_group_rate_n,decision_cycle_days,decision_days,decision_pv,is_order,statis_date"* > //*schemaString.length=72 * > > *import org.apache.spark.sql.Row;* > > *import org.apache.spark.sql.types.{StructType,StructField,StringType};* > > *val schema =StructType( schemaString.split(",").map(fieldName => > StructField(fieldName, StringType, true)))* > > *val > rowRDD=sc.textFile("/user/spark/short_model").flatMap(_.split("\\t")).map(p > => > Row(p(0),p(1),p(2),p(3),p(4),p(5),p(6),p(7),p(8),p(9),p(10),p(11),p(12),p(13),p(14),p(15),p(16),p(17),p(18),p(19),p(20),p(21),p(22),p(23),p(24),p(25),p(26),p(27),p(28),p(29),p(30),p(31),p(32),p(33),p(34),p(35),p(36),p(37),p(38),p(39),p(40),p(41),p(42),p(43),p(44),p(45),p(46),p(47),p(48),p(49),p(50),p(51),p(52),p(53),p(54),p(55),p(56),p(57),p(58),p(59),p(60),p(61),p(62),p(63),p(64),p(65),p(66),p(67),p(68),p(69),p(70),p(71)))* > > *val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)* > > *peopleDataFrame.registerTempTable("alg")* > > *val results = sqlContext.sql("SELECT count(*) FROM alg")* > > *results.collect()* > > > the error log as following: > > 5/08/28 17:00:54 WARN TaskSetManager: Lost task 56.0 in > stage 9.0 (TID 71, 10.104.74.8): java.lang.StringIndexOutOfBoundsException: > String index out of range: 18 > at java.lang.String.charAt(String.java:658) > at > scala.collection.immutable.StringOps$.apply$extension(StringOps.scala:39) > at > $line415.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:26) > at > $line415.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:26) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:130) > at > org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > 15/08/28 17:00:54 INFO TaskSetManager: Starting task 56.1 in stage 9.0 > (TID 72, 10.104.74.8, NODE_LOCAL, 1415 bytes) > 15/08/28 17:00:54 INFO TaskSetManager: Lost task 56.1 in stage 9.0 (TID > 72) on executor 10.104.74.8: java.lang.StringIndexOutOfBoundsException > (String index out of range: 18) [duplicate 1] > 15/08/28 17:00:54 INFO TaskSetManager: Starting task 56.2 in stage 9.0 > (TID 73, 10.104.74.8, NODE_LOCAL, 1415 bytes) > 15/08/28 17:00:54 INFO TaskSetManager: Lost task 45.0 in stage 9.0 (TID > 70) on executor 10.104.74.6: java.lang.StringIndexOutOfBoundsException > (String index out of range: 18) [duplicate 2] > 15/08/28 17:00:54 INFO TaskSetManager: Starting task 45.1 in stage 9.0 > (TID 74, 10.104.74.6, NODE_LOCAL, 1415 bytes) > 15/08/28 17:00:54 INFO TaskSetManager: Lost task 56.2 in stage 9.0 (TID > 73) on executor 10.104.74.8: java.lang.StringIndexOutOfBoundsException > (String index out of range: 18) [duplicate 3] > 15/08/28 17:00:54 INFO TaskSetManager: Starting task 56.3 in stage 9.0 > (TID 75, 10.104.74.8, NODE_LOCAL, 1415 bytes) > 15/08/28 17:00:54 INFO TaskSetManager: Lost task 56.3 in stage 9.0 (TID > 75) on executor 10.104.74.8: java.lang.StringIndexOutOfBoundsException > (String index out of range: 18) [duplicate 4] > 15/08/28 17:00:54 ERROR TaskSetManager: Task 56 in stage 9.0 failed 4 > times; aborting job > 15/08/28 17:00:54 INFO TaskSetManager: Lost task 45.1 in stage 9.0 (TID > 74) on executor 10.104.74.6: java.lang.StringIndexOutOfBoundsException > (String index out of range: 18) [duplicate 5] > 15/08/28 17:00:54 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks > have all completed, from pool > 15/08/28 17:00:54 INFO TaskSchedulerImpl: Cancelling stage 9 > 15/08/28 17:00:54 INFO DAGScheduler: ShuffleMapStage 9 (collect at > <console>:31) failed in 0.206 s > 15/08/28 17:00:54 INFO DAGScheduler: Job 6 failed: collect at > <console>:31, took 0.293903 s > org.apache.spark.SparkException: Job aborted due to stage failure: Task 56 > in stage 9.0 failed 4 times, most recent failure: Lost task 56.3 in stage > 9.0 (TID 75, 10.104.74.8): java.lang.StringIndexOutOfBoundsException: > String index out of range: 18 > at java.lang.String.charAt(String.java:658) > at > scala.collection.immutable.StringOps$.apply$extension(StringOps.scala:39) > at > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:26) > at > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:26) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:130) > at > org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > > > > > > > > ------------------------------ > Ricky Ou(欧 锐) > > 部 门:苏宁云商 IT总部技术支撑研发中心大 数据中心数据平台开发部 > > email : our...@cnsuning.com <14070...@cnsuning.com> > >