Hi Bryan,

Did you read the email I sent few days ago. There are more issues with 
partitionBy down the road: 
https://www.mail-archive.com/user@spark.apache.org/msg39512.html 
<https://www.mail-archive.com/user@spark.apache.org/msg39512.html>

Best Regards,

Jerry

> On Oct 28, 2015, at 4:52 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote:
> 
> The second issue I'm seeing is an OOM issue when writing partitioned data.  I 
> am running Spark 1.4.1, Scala 2.11, Hadoop 2.6.1 & using the Hive libraries 
> packaged with Spark.  Spark was compiled using the following:  mvn 
> -Dhadoop.version=2.6.1 -Dscala-2.11 -DskipTests -Pyarn -Phive 
> -Phive-thriftserver package
> 
> Given a case class like the following:
> 
> case class HiveWindowsEvent(
>                              targetEntity: String,
>                              targetEntityType: String,
>                              dateTimeUtc: Timestamp,
>                              eventid: String,
>                              eventData: Map[String, String],
>                              description: String,
>                              eventRecordId: String,
>                              level: String,
>                              machineName: String,
>                              sequenceNumber: String,
>                              source: String,
>                              sourceMachineName: String,
>                              taskCategory: String,
>                              user: String,
>                              machineIp: String,
>                              additionalData: Map[String, String],
>                              windowseventtimebin: Long
>                              )
> 
> The command to write data works fine (and when queried via Beeline data is 
> correct):
> 
>     val hc = new HiveContext(sc)
>     import hc.implicits._
> 
>     val partitioner = new HashPartitioner(5)
>     hiveWindowsEvents.foreachRDD(rdd => {
>       val eventsDF = rdd.toDF()
>       eventsDF
>         .write
>         .mode(SaveMode.Append).saveAsTable("windows_event9")
>     })
> 
> Once I add the partitioning (few partitions - three or less):
> 
>     val hc = new HiveContext(sc)
>     import hc.implicits._
> 
>     val partitioner = new HashPartitioner(5)
>     hiveWindowsEvents.foreachRDD(rdd => {
>       val eventsDF = rdd.toDF()
>       eventsDF
>         .write
>         .partitionBy("windowseventtimebin")
>         .mode(SaveMode.Append).saveAsTable("windows_event9")
>     })
> 
> I see the following error when writing to (3) partitions:
> 
> 15/10/28 20:23:01 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, 
> 10.0.0.6): org.apache.spark.SparkException: Task failed while writing rows.
>         at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org 
> <http://org.apache.spark.sql.sources.insertintohadoopfsrelation.org/>$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:270)
>         at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
>         at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>         at org.apache.spark.scheduler.Task.run(Task.scala:70)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.OutOfMemoryError: Java heap space
>         at 
> parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
>         at 
> parquet.bytes.CapacityByteArrayOutputStream.<init>(CapacityByteArrayOutputStream.java:57)
>         at 
> parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:68)
>         at 
> parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:48)
>         at 
> parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
>         at 
> parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
>         at 
> parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
>         at 
> parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178)
>         at 
> parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
>         at 
> parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
>         at 
> parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94)
>         at 
> parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64)
>         at 
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
>         at 
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
>         at 
> org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:83)
>         at 
> org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:229)
>         at 
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:530)
>         at 
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:525)
>         at 
> scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
>         at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
>         at 
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:525)
>         at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org 
> <http://org.apache.spark.sql.sources.insertintohadoopfsrelation.org/>$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:262)
>         ... 8 more
> 
> I have dropped the data input volume so that it is negligible (100 events / 
> second).  I am still seeing this OOM error.  I removed the 'map' elements in 
> the case class above after seeing several issues with serializing maps to 
> Parquet, and I'm still seeing the same errors.
> 
> The table (as created automatically by Spark HiveContext, with the maps 
> removed) has the following:
> 
> 0: jdbc:hive2://localhost:10000> describe windows_event9;
> +----------------------+------------+----------+
> |       col_name       | data_type  | comment  |
> +----------------------+------------+----------+
> | targetEntity         | string     |          |
> | targetEntityType     | string     |          |
> | dateTimeUtc          | timestamp  |          |
> | eventid              | string     |          |
> | description          | string     |          |
> | eventRecordId        | string     |          |
> | level                | string     |          |
> | machineName          | string     |          |
> | sequenceNumber       | string     |          |
> | source               | string     |          |
> | sourceMachineName    | string     |          |
> | taskCategory         | string     |          |
> | user                 | string     |          |
> | machineIp            | string     |          |
> | windowseventtimebin  | bigint     |          |
> +----------------------+------------+----------+
> 
> | SerDe Library:        
> org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe                     
>                                                                               
>                                      |
> | InputFormat:          org.apache.hadoop.mapred.SequenceFileInputFormat      
>                                                                               
>                                                               |
> | OutputFormat:         
> org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat                     
>                                                                               
>                                      |
> 
> This seems like a pretty big bug associated with persistent tables.  Am I 
> missing a step somewhere?
> 
> Thank you,
> 
> Bryan Jeffrey
> 
> 
> 
> On Wed, Oct 28, 2015 at 4:10 PM, Bryan Jeffrey <bryan.jeff...@gmail.com 
> <mailto:bryan.jeff...@gmail.com>> wrote:
> All,
> 
> One issue I'm seeing is that I start the thrift server (for jdbc access) via 
> the following: /spark/spark-1.4.1/sbin/start-thriftserver.sh --master 
> spark://master:7077 --hiveconf "spark.cores.max=2"
> 
> After about 40 seconds the Thrift server is started and available on default 
> port 10000.
> 
> I then submit my application - and the application throws the following 
> error: 
> 
> Caused by: java.sql.SQLException: Failed to start database 'metastore_db' 
> with class loader 
> org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@6a552721, see 
> the next exception for details.
>         at 
> org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
>         at 
> org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
>  Source)
>         ... 86 more
> Caused by: java.sql.SQLException: Another instance of Derby may have already 
> booted the database /spark/spark-1.4.1/metastore_db.
>         at 
> org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
>         at 
> org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
>  Source)
>         at 
> org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown 
> Source)
>         at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown 
> Source)
>         ... 83 more
> Caused by: ERROR XSDB6: Another instance of Derby may have already booted the 
> database /spark/spark-1.4.1/metastore_db.
> 
> This also happens if I do the opposite (submit the application first, and 
> then start the thrift server).
> 
> It looks similar to the following issue -- but not quite the same: 
> https://issues.apache.org/jira/browse/SPARK-9776 
> <https://issues.apache.org/jira/browse/SPARK-9776>
> 
> It seems like this set of steps works fine if the metadata database is not 
> yet created - but once it's created this happens every time.  Is this a known 
> issue? Is there a workaround?
> 
> Regards,
> 
> Bryan Jeffrey
> 
> On Wed, Oct 28, 2015 at 3:13 PM, Bryan Jeffrey <bryan.jeff...@gmail.com 
> <mailto:bryan.jeff...@gmail.com>> wrote:
> Susan,
> 
> I did give that a shot -- I'm seeing a number of oddities:
> 
> (1) 'Partition By' appears only accepts alphanumeric lower case fields.  It 
> will work for 'machinename', but not 'machineName' or 'machine_name'.
> (2) When partitioning with maps included in the data I get odd string 
> conversion issues
> (3) When partitioning without maps I see frequent out of memory issues
> 
> I'll update this email when I've got a more concrete example of problems.
> 
> Regards,
> 
> Bryan Jeffrey
> 
> 
> 
> On Wed, Oct 28, 2015 at 1:33 PM, Susan Zhang <suchenz...@gmail.com 
> <mailto:suchenz...@gmail.com>> wrote:
> Have you tried partitionBy? 
> 
> Something like
> 
> hiveWindowsEvents.foreachRDD( rdd => {
>       val eventsDataFrame = rdd.toDF()
>       
> eventsDataFrame.write.mode(SaveMode.Append).partitionBy("windows_event_time_bin").saveAsTable("windows_event")
>     })
> 
> 
> 
> On Wed, Oct 28, 2015 at 7:41 AM, Bryan Jeffrey <bryan.jeff...@gmail.com 
> <mailto:bryan.jeff...@gmail.com>> wrote:
> Hello.
> 
> I am working to get a simple solution working using Spark SQL.  I am writing 
> streaming data to persistent tables using a HiveContext.  Writing to a 
> persistent non-partitioned table works well - I update the table using Spark 
> streaming, and the output is available via Hive Thrift/JDBC. 
> 
> I create a table that looks like the following:
> 
> 0: jdbc:hive2://localhost:10000> describe windows_event;
> describe windows_event;
> +--------------------------+---------------------+----------+
> |         col_name         |      data_type      | comment  |
> +--------------------------+---------------------+----------+
> | target_entity            | string              | NULL     |
> | target_entity_type       | string              | NULL     |
> | date_time_utc            | timestamp           | NULL     |
> | machine_ip               | string              | NULL     |
> | event_id                 | string              | NULL     |
> | event_data               | map<string,string>  | NULL     |
> | description              | string              | NULL     |
> | event_record_id          | string              | NULL     |
> | level                    | string              | NULL     |
> | machine_name             | string              | NULL     |
> | sequence_number          | string              | NULL     |
> | source                   | string              | NULL     |
> | source_machine_name      | string              | NULL     |
> | task_category            | string              | NULL     |
> | user                     | string              | NULL     |
> | additional_data          | map<string,string>  | NULL     |
> | windows_event_time_bin   | timestamp           | NULL     |
> | # Partition Information  |                     |          |
> | # col_name               | data_type           | comment  |
> | windows_event_time_bin   | timestamp           | NULL     |
> +--------------------------+---------------------+----------+
> 
> 
> However, when I create a partitioned table and write data using the following:
> 
>     hiveWindowsEvents.foreachRDD( rdd => {
>       val eventsDataFrame = rdd.toDF()
>       eventsDataFrame.write.mode(SaveMode.Append).saveAsTable("windows_event")
>     })
> 
> The data is written as though the table is not partitioned (so everything is 
> written to /user/hive/warehouse/windows_event/file.gz.paquet.  Because the 
> data is not following the partition schema, it is not accessible (and not 
> partitioned).
> 
> Is there a straightforward way to write to partitioned tables using Spark 
> SQL?  I understand that the read performance for partitioned data is far 
> better - are there other performance improvements that might be better to use 
> instead of partitioning?
> 
> Regards,
> 
> Bryan Jeffrey
> 
> 
> 
> 

Reply via email to