Jerry, Thank you for the note. It sounds like you were able to get further than I have been - any insight? Just a Spark 1.4.1 vs Spark 1.5?
Regards, Bryan Jeffrey -----Original Message----- From: "Jerry Lam" <chiling...@gmail.com> Sent: 10/28/2015 6:29 PM To: "Bryan Jeffrey" <bryan.jeff...@gmail.com> Cc: "Susan Zhang" <suchenz...@gmail.com>; "user" <user@spark.apache.org> Subject: Re: Spark -- Writing to Partitioned Persistent Table Hi Bryan, Did you read the email I sent few days ago. There are more issues with partitionBy down the road: https://www.mail-archive.com/user@spark.apache.org/msg39512.html Best Regards, Jerry On Oct 28, 2015, at 4:52 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote: The second issue I'm seeing is an OOM issue when writing partitioned data. I am running Spark 1.4.1, Scala 2.11, Hadoop 2.6.1 & using the Hive libraries packaged with Spark. Spark was compiled using the following: mvn -Dhadoop.version=2.6.1 -Dscala-2.11 -DskipTests -Pyarn -Phive -Phive-thriftserver package Given a case class like the following: case class HiveWindowsEvent( targetEntity: String, targetEntityType: String, dateTimeUtc: Timestamp, eventid: String, eventData: Map[String, String], description: String, eventRecordId: String, level: String, machineName: String, sequenceNumber: String, source: String, sourceMachineName: String, taskCategory: String, user: String, machineIp: String, additionalData: Map[String, String], windowseventtimebin: Long ) The command to write data works fine (and when queried via Beeline data is correct): val hc = new HiveContext(sc) import hc.implicits._ val partitioner = new HashPartitioner(5) hiveWindowsEvents.foreachRDD(rdd => { val eventsDF = rdd.toDF() eventsDF .write .mode(SaveMode.Append).saveAsTable("windows_event9") }) Once I add the partitioning (few partitions - three or less): val hc = new HiveContext(sc) import hc.implicits._ val partitioner = new HashPartitioner(5) hiveWindowsEvents.foreachRDD(rdd => { val eventsDF = rdd.toDF() eventsDF .write .partitionBy("windowseventtimebin") .mode(SaveMode.Append).saveAsTable("windows_event9") }) I see the following error when writing to (3) partitions: 15/10/28 20:23:01 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, 10.0.0.6): org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:270) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.OutOfMemoryError: Java heap space at parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) at parquet.bytes.CapacityByteArrayOutputStream.<init>(CapacityByteArrayOutputStream.java:57) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:68) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:48) at parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) at parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178) at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369) at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) at parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94) at parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) at org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:83) at org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:229) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:530) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:525) at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194) at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:525) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:262) ... 8 more I have dropped the data input volume so that it is negligible (100 events / second). I am still seeing this OOM error. I removed the 'map' elements in the case class above after seeing several issues with serializing maps to Parquet, and I'm still seeing the same errors. The table (as created automatically by Spark HiveContext, with the maps removed) has the following: 0: jdbc:hive2://localhost:10000> describe windows_event9; +----------------------+------------+----------+ | col_name | data_type | comment | +----------------------+------------+----------+ | targetEntity | string | | | targetEntityType | string | | | dateTimeUtc | timestamp | | | eventid | string | | | description | string | | | eventRecordId | string | | | level | string | | | machineName | string | | | sequenceNumber | string | | | source | string | | | sourceMachineName | string | | | taskCategory | string | | | user | string | | | machineIp | string | | | windowseventtimebin | bigint | | +----------------------+------------+----------+ | SerDe Library: org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe | | InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat | | OutputFormat: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat | This seems like a pretty big bug associated with persistent tables. Am I missing a step somewhere? Thank you, Bryan Jeffrey On Wed, Oct 28, 2015 at 4:10 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote: <blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;pa [The entire original message is not included.]