Hi Bryan, Did you read the email I sent few days ago. There are more issues with partitionBy down the road: https://www.mail-archive.com/user@spark.apache.org/msg39512.html <https://www.mail-archive.com/user@spark.apache.org/msg39512.html>
Best Regards, Jerry > On Oct 28, 2015, at 4:52 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote: > > The second issue I'm seeing is an OOM issue when writing partitioned data. I > am running Spark 1.4.1, Scala 2.11, Hadoop 2.6.1 & using the Hive libraries > packaged with Spark. Spark was compiled using the following: mvn > -Dhadoop.version=2.6.1 -Dscala-2.11 -DskipTests -Pyarn -Phive > -Phive-thriftserver package > > Given a case class like the following: > > case class HiveWindowsEvent( > targetEntity: String, > targetEntityType: String, > dateTimeUtc: Timestamp, > eventid: String, > eventData: Map[String, String], > description: String, > eventRecordId: String, > level: String, > machineName: String, > sequenceNumber: String, > source: String, > sourceMachineName: String, > taskCategory: String, > user: String, > machineIp: String, > additionalData: Map[String, String], > windowseventtimebin: Long > ) > > The command to write data works fine (and when queried via Beeline data is > correct): > > val hc = new HiveContext(sc) > import hc.implicits._ > > val partitioner = new HashPartitioner(5) > hiveWindowsEvents.foreachRDD(rdd => { > val eventsDF = rdd.toDF() > eventsDF > .write > .mode(SaveMode.Append).saveAsTable("windows_event9") > }) > > Once I add the partitioning (few partitions - three or less): > > val hc = new HiveContext(sc) > import hc.implicits._ > > val partitioner = new HashPartitioner(5) > hiveWindowsEvents.foreachRDD(rdd => { > val eventsDF = rdd.toDF() > eventsDF > .write > .partitionBy("windowseventtimebin") > .mode(SaveMode.Append).saveAsTable("windows_event9") > }) > > I see the following error when writing to (3) partitions: > > 15/10/28 20:23:01 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, > 10.0.0.6): org.apache.spark.SparkException: Task failed while writing rows. > at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org > <http://org.apache.spark.sql.sources.insertintohadoopfsrelation.org/>$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:270) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.OutOfMemoryError: Java heap space > at > parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) > at > parquet.bytes.CapacityByteArrayOutputStream.<init>(CapacityByteArrayOutputStream.java:57) > at > parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:68) > at > parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:48) > at > parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) > at > parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) > at > parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) > at > parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178) > at > parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369) > at > parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) > at > parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94) > at > parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64) > at > parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) > at > parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) > at > org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:83) > at > org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:229) > at > org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:530) > at > org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:525) > at > scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194) > at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80) > at > org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:525) > at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org > <http://org.apache.spark.sql.sources.insertintohadoopfsrelation.org/>$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:262) > ... 8 more > > I have dropped the data input volume so that it is negligible (100 events / > second). I am still seeing this OOM error. I removed the 'map' elements in > the case class above after seeing several issues with serializing maps to > Parquet, and I'm still seeing the same errors. > > The table (as created automatically by Spark HiveContext, with the maps > removed) has the following: > > 0: jdbc:hive2://localhost:10000> describe windows_event9; > +----------------------+------------+----------+ > | col_name | data_type | comment | > +----------------------+------------+----------+ > | targetEntity | string | | > | targetEntityType | string | | > | dateTimeUtc | timestamp | | > | eventid | string | | > | description | string | | > | eventRecordId | string | | > | level | string | | > | machineName | string | | > | sequenceNumber | string | | > | source | string | | > | sourceMachineName | string | | > | taskCategory | string | | > | user | string | | > | machineIp | string | | > | windowseventtimebin | bigint | | > +----------------------+------------+----------+ > > | SerDe Library: > org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe > > | > | InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat > > | > | OutputFormat: > org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat > > | > > This seems like a pretty big bug associated with persistent tables. Am I > missing a step somewhere? > > Thank you, > > Bryan Jeffrey > > > > On Wed, Oct 28, 2015 at 4:10 PM, Bryan Jeffrey <bryan.jeff...@gmail.com > <mailto:bryan.jeff...@gmail.com>> wrote: > All, > > One issue I'm seeing is that I start the thrift server (for jdbc access) via > the following: /spark/spark-1.4.1/sbin/start-thriftserver.sh --master > spark://master:7077 --hiveconf "spark.cores.max=2" > > After about 40 seconds the Thrift server is started and available on default > port 10000. > > I then submit my application - and the application throws the following > error: > > Caused by: java.sql.SQLException: Failed to start database 'metastore_db' > with class loader > org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@6a552721, see > the next exception for details. > at > org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source) > at > org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown > Source) > ... 86 more > Caused by: java.sql.SQLException: Another instance of Derby may have already > booted the database /spark/spark-1.4.1/metastore_db. > at > org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source) > at > org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown > Source) > at > org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown > Source) > at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown > Source) > ... 83 more > Caused by: ERROR XSDB6: Another instance of Derby may have already booted the > database /spark/spark-1.4.1/metastore_db. > > This also happens if I do the opposite (submit the application first, and > then start the thrift server). > > It looks similar to the following issue -- but not quite the same: > https://issues.apache.org/jira/browse/SPARK-9776 > <https://issues.apache.org/jira/browse/SPARK-9776> > > It seems like this set of steps works fine if the metadata database is not > yet created - but once it's created this happens every time. Is this a known > issue? Is there a workaround? > > Regards, > > Bryan Jeffrey > > On Wed, Oct 28, 2015 at 3:13 PM, Bryan Jeffrey <bryan.jeff...@gmail.com > <mailto:bryan.jeff...@gmail.com>> wrote: > Susan, > > I did give that a shot -- I'm seeing a number of oddities: > > (1) 'Partition By' appears only accepts alphanumeric lower case fields. It > will work for 'machinename', but not 'machineName' or 'machine_name'. > (2) When partitioning with maps included in the data I get odd string > conversion issues > (3) When partitioning without maps I see frequent out of memory issues > > I'll update this email when I've got a more concrete example of problems. > > Regards, > > Bryan Jeffrey > > > > On Wed, Oct 28, 2015 at 1:33 PM, Susan Zhang <suchenz...@gmail.com > <mailto:suchenz...@gmail.com>> wrote: > Have you tried partitionBy? > > Something like > > hiveWindowsEvents.foreachRDD( rdd => { > val eventsDataFrame = rdd.toDF() > > eventsDataFrame.write.mode(SaveMode.Append).partitionBy("windows_event_time_bin").saveAsTable("windows_event") > }) > > > > On Wed, Oct 28, 2015 at 7:41 AM, Bryan Jeffrey <bryan.jeff...@gmail.com > <mailto:bryan.jeff...@gmail.com>> wrote: > Hello. > > I am working to get a simple solution working using Spark SQL. I am writing > streaming data to persistent tables using a HiveContext. Writing to a > persistent non-partitioned table works well - I update the table using Spark > streaming, and the output is available via Hive Thrift/JDBC. > > I create a table that looks like the following: > > 0: jdbc:hive2://localhost:10000> describe windows_event; > describe windows_event; > +--------------------------+---------------------+----------+ > | col_name | data_type | comment | > +--------------------------+---------------------+----------+ > | target_entity | string | NULL | > | target_entity_type | string | NULL | > | date_time_utc | timestamp | NULL | > | machine_ip | string | NULL | > | event_id | string | NULL | > | event_data | map<string,string> | NULL | > | description | string | NULL | > | event_record_id | string | NULL | > | level | string | NULL | > | machine_name | string | NULL | > | sequence_number | string | NULL | > | source | string | NULL | > | source_machine_name | string | NULL | > | task_category | string | NULL | > | user | string | NULL | > | additional_data | map<string,string> | NULL | > | windows_event_time_bin | timestamp | NULL | > | # Partition Information | | | > | # col_name | data_type | comment | > | windows_event_time_bin | timestamp | NULL | > +--------------------------+---------------------+----------+ > > > However, when I create a partitioned table and write data using the following: > > hiveWindowsEvents.foreachRDD( rdd => { > val eventsDataFrame = rdd.toDF() > eventsDataFrame.write.mode(SaveMode.Append).saveAsTable("windows_event") > }) > > The data is written as though the table is not partitioned (so everything is > written to /user/hive/warehouse/windows_event/file.gz.paquet. Because the > data is not following the partition schema, it is not accessible (and not > partitioned). > > Is there a straightforward way to write to partitioned tables using Spark > SQL? I understand that the read performance for partitioned data is far > better - are there other performance improvements that might be better to use > instead of partitioning? > > Regards, > > Bryan Jeffrey > > > >