Jerry,

Thank you for the note. It sounds like you were able to get further than I have 
been - any insight? Just a Spark 1.4.1 vs Spark 1.5?

Regards,

Bryan Jeffrey

-----Original Message-----
From: "Jerry Lam" <chiling...@gmail.com>
Sent: ‎10/‎28/‎2015 6:29 PM
To: "Bryan Jeffrey" <bryan.jeff...@gmail.com>
Cc: "Susan Zhang" <suchenz...@gmail.com>; "user" <user@spark.apache.org>
Subject: Re: Spark -- Writing to Partitioned Persistent Table

Hi Bryan,


Did you read the email I sent few days ago. There are more issues with 
partitionBy down the road: 
https://www.mail-archive.com/user@spark.apache.org/msg39512.html


Best Regards,


Jerry


On Oct 28, 2015, at 4:52 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote:


The second issue I'm seeing is an OOM issue when writing partitioned data.  I 
am running Spark 1.4.1, Scala 2.11, Hadoop 2.6.1 & using the Hive libraries 
packaged with Spark.  Spark was compiled using the following:  mvn 
-Dhadoop.version=2.6.1 -Dscala-2.11 -DskipTests -Pyarn -Phive 
-Phive-thriftserver package


Given a case class like the following:


case class HiveWindowsEvent(
                             targetEntity: String,
                             targetEntityType: String,
                             dateTimeUtc: Timestamp,
                             eventid: String,
                             eventData: Map[String, String],
                             description: String,
                             eventRecordId: String,
                             level: String,
                             machineName: String,
                             sequenceNumber: String,
                             source: String,
                             sourceMachineName: String,
                             taskCategory: String,
                             user: String,
                             machineIp: String,
                             additionalData: Map[String, String],
                             windowseventtimebin: Long
                             )


The command to write data works fine (and when queried via Beeline data is 
correct):


    val hc = new HiveContext(sc)
    import hc.implicits._


    val partitioner = new HashPartitioner(5)
    hiveWindowsEvents.foreachRDD(rdd => {
      val eventsDF = rdd.toDF()
      eventsDF
        .write
        .mode(SaveMode.Append).saveAsTable("windows_event9")
    })


Once I add the partitioning (few partitions - three or less):


    val hc = new HiveContext(sc)
    import hc.implicits._


    val partitioner = new HashPartitioner(5)
    hiveWindowsEvents.foreachRDD(rdd => {
      val eventsDF = rdd.toDF()
      eventsDF
        .write
        .partitionBy("windowseventtimebin")
        .mode(SaveMode.Append).saveAsTable("windows_event9")
    })


I see the following error when writing to (3) partitions:


15/10/28 20:23:01 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, 
10.0.0.6): org.apache.spark.SparkException: Task failed while writing rows.
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:270)
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Java heap space
        at 
parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
        at 
parquet.bytes.CapacityByteArrayOutputStream.<init>(CapacityByteArrayOutputStream.java:57)
        at 
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:68)
        at 
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:48)
        at 
parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
        at 
parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
        at 
parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
        at 
parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178)
        at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
        at 
parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
        at 
parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94)
        at 
parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64)
        at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
        at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
        at 
org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:83)
        at 
org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:229)
        at 
org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:530)
        at 
org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:525)
        at 
scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
        at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
        at 
org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:525)
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:262)
        ... 8 more


I have dropped the data input volume so that it is negligible (100 events / 
second).  I am still seeing this OOM error.  I removed the 'map' elements in 
the case class above after seeing several issues with serializing maps to 
Parquet, and I'm still seeing the same errors.


The table (as created automatically by Spark HiveContext, with the maps 
removed) has the following:


0: jdbc:hive2://localhost:10000> describe windows_event9;
+----------------------+------------+----------+
|       col_name       | data_type  | comment  |
+----------------------+------------+----------+
| targetEntity         | string     |          |
| targetEntityType     | string     |          |
| dateTimeUtc          | timestamp  |          |
| eventid              | string     |          |
| description          | string     |          |
| eventRecordId        | string     |          |
| level                | string     |          |
| machineName          | string     |          |
| sequenceNumber       | string     |          |
| source               | string     |          |
| sourceMachineName    | string     |          |
| taskCategory         | string     |          |
| user                 | string     |          |
| machineIp            | string     |          |
| windowseventtimebin  | bigint     |          |
+----------------------+------------+----------+



| SerDe Library:        
org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe                       
                                                                                
                                 |
| InputFormat:          org.apache.hadoop.mapred.SequenceFileInputFormat        
                                                                                
                                                          |
| OutputFormat:         
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat                       
                                                                                
                                 |



This seems like a pretty big bug associated with persistent tables.  Am I 
missing a step somewhere?


Thank you,


Bryan Jeffrey






On Wed, Oct 28, 2015 at 4:10 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote:
<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc 
solid;pa

[The entire original message is not included.]

Reply via email to