The second issue I'm seeing is an OOM issue when writing partitioned data.
I am running Spark 1.4.1, Scala 2.11, Hadoop 2.6.1 & using the Hive
libraries packaged with Spark.  Spark was compiled using the following:
mvn -Dhadoop.version=2.6.1 -Dscala-2.11 -DskipTests -Pyarn -Phive
-Phive-thriftserver package

Given a case class like the following:

case class HiveWindowsEvent(
                             targetEntity: String,
                             targetEntityType: String,
                             dateTimeUtc: Timestamp,
                             eventid: String,
                             eventData: Map[String, String],
                             description: String,
                             eventRecordId: String,
                             level: String,
                             machineName: String,
                             sequenceNumber: String,
                             source: String,
                             sourceMachineName: String,
                             taskCategory: String,
                             user: String,
                             machineIp: String,
                             additionalData: Map[String, String],
                             windowseventtimebin: Long
                             )

The command to write data works fine (and when queried via Beeline data is
correct):

    val hc = new HiveContext(sc)
    import hc.implicits._

    val partitioner = new HashPartitioner(5)
    hiveWindowsEvents.foreachRDD(rdd => {
      val eventsDF = rdd.toDF()
      eventsDF
        .write
        .mode(SaveMode.Append).saveAsTable("windows_event9")
    })

Once I add the partitioning (few partitions - three or less):

    val hc = new HiveContext(sc)
    import hc.implicits._

    val partitioner = new HashPartitioner(5)
    hiveWindowsEvents.foreachRDD(rdd => {
      val eventsDF = rdd.toDF()
      eventsDF
        .write
        .partitionBy("windowseventtimebin")
        .mode(SaveMode.Append).saveAsTable("windows_event9")
    })

I see the following error when writing to (3) partitions:

15/10/28 20:23:01 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1,
10.0.0.6): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org
$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:270)
        at
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
        at
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Java heap space
        at
parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
        at
parquet.bytes.CapacityByteArrayOutputStream.<init>(CapacityByteArrayOutputStream.java:57)
        at
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:68)
        at
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:48)
        at
parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
        at
parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
        at
parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
        at
parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178)
        at
parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
        at
parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
        at
parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94)
        at
parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64)
        at
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
        at
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
        at
org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:83)
        at
org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:229)
        at
org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:530)
        at
org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:525)
        at
scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
        at
scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
        at
org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:525)
        at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org
$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:262)
        ... 8 more

I have dropped the data input volume so that it is negligible (100 events /
second).  I am still seeing this OOM error.  I removed the 'map' elements
in the case class above after seeing several issues with serializing maps
to Parquet, and I'm still seeing the same errors.

The table (as created automatically by Spark HiveContext, with the maps
removed) has the following:

0: jdbc:hive2://localhost:10000> describe windows_event9;
+----------------------+------------+----------+
|       col_name       | data_type  | comment  |
+----------------------+------------+----------+
| targetEntity         | string     |          |
| targetEntityType     | string     |          |
| dateTimeUtc          | timestamp  |          |
| eventid              | string     |          |
| description          | string     |          |
| eventRecordId        | string     |          |
| level                | string     |          |
| machineName          | string     |          |
| sequenceNumber       | string     |          |
| source               | string     |          |
| sourceMachineName    | string     |          |
| taskCategory         | string     |          |
| user                 | string     |          |
| machineIp            | string     |          |
| windowseventtimebin  | bigint     |          |
+----------------------+------------+----------+

| SerDe Library:
org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe
|
| InputFormat:
org.apache.hadoop.mapred.SequenceFileInputFormat
|
| OutputFormat:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
|

This seems like a pretty big bug associated with persistent tables.  Am I
missing a step somewhere?

Thank you,

Bryan Jeffrey



On Wed, Oct 28, 2015 at 4:10 PM, Bryan Jeffrey <bryan.jeff...@gmail.com>
wrote:

> All,
>
> One issue I'm seeing is that I start the thrift server (for jdbc access)
> via the following: /spark/spark-1.4.1/sbin/start-thriftserver.sh --master
> spark://master:7077 --hiveconf "spark.cores.max=2"
>
> After about 40 seconds the Thrift server is started and available on
> default port 10000.
>
> I then submit my application - and the application throws the following
> error:
>
> Caused by: java.sql.SQLException: Failed to start database 'metastore_db'
> with class loader
> org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@6a552721,
> see the next exception for details.
>         at
> org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown
> Source)
>         at
> org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
> Source)
>         ... 86 more
> Caused by: java.sql.SQLException: Another instance of Derby may have
> already booted the database /spark/spark-1.4.1/metastore_db.
>         at
> org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown
> Source)
>         at
> org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
> Source)
>         at
> org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown
> Source)
>         at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown
> Source)
>         ... 83 more
> Caused by: ERROR XSDB6: Another instance of Derby may have already booted
> the database /spark/spark-1.4.1/metastore_db.
>
> This also happens if I do the opposite (submit the application first, and
> then start the thrift server).
>
> It looks similar to the following issue -- but not quite the same:
> https://issues.apache.org/jira/browse/SPARK-9776
>
> It seems like this set of steps works fine if the metadata database is not
> yet created - but once it's created this happens every time.  Is this a
> known issue? Is there a workaround?
>
> Regards,
>
> Bryan Jeffrey
>
> On Wed, Oct 28, 2015 at 3:13 PM, Bryan Jeffrey <bryan.jeff...@gmail.com>
> wrote:
>
>> Susan,
>>
>> I did give that a shot -- I'm seeing a number of oddities:
>>
>> (1) 'Partition By' appears only accepts alphanumeric lower case fields.
>> It will work for 'machinename', but not 'machineName' or 'machine_name'.
>> (2) When partitioning with maps included in the data I get odd string
>> conversion issues
>> (3) When partitioning without maps I see frequent out of memory issues
>>
>> I'll update this email when I've got a more concrete example of problems.
>>
>> Regards,
>>
>> Bryan Jeffrey
>>
>>
>>
>> On Wed, Oct 28, 2015 at 1:33 PM, Susan Zhang <suchenz...@gmail.com>
>> wrote:
>>
>>> Have you tried partitionBy?
>>>
>>> Something like
>>>
>>> hiveWindowsEvents.foreachRDD( rdd => {
>>>       val eventsDataFrame = rdd.toDF()
>>>       eventsDataFrame.write.mode(SaveMode.Append).partitionBy("
>>> windows_event_time_bin").saveAsTable("windows_event")
>>>     })
>>>
>>>
>>>
>>> On Wed, Oct 28, 2015 at 7:41 AM, Bryan Jeffrey <bryan.jeff...@gmail.com>
>>> wrote:
>>>
>>>> Hello.
>>>>
>>>> I am working to get a simple solution working using Spark SQL.  I am
>>>> writing streaming data to persistent tables using a HiveContext.  Writing
>>>> to a persistent non-partitioned table works well - I update the table using
>>>> Spark streaming, and the output is available via Hive Thrift/JDBC.
>>>>
>>>> I create a table that looks like the following:
>>>>
>>>> 0: jdbc:hive2://localhost:10000> describe windows_event;
>>>> describe windows_event;
>>>> +--------------------------+---------------------+----------+
>>>> |         col_name         |      data_type      | comment  |
>>>> +--------------------------+---------------------+----------+
>>>> | target_entity            | string              | NULL     |
>>>> | target_entity_type       | string              | NULL     |
>>>> | date_time_utc            | timestamp           | NULL     |
>>>> | machine_ip               | string              | NULL     |
>>>> | event_id                 | string              | NULL     |
>>>> | event_data               | map<string,string>  | NULL     |
>>>> | description              | string              | NULL     |
>>>> | event_record_id          | string              | NULL     |
>>>> | level                    | string              | NULL     |
>>>> | machine_name             | string              | NULL     |
>>>> | sequence_number          | string              | NULL     |
>>>> | source                   | string              | NULL     |
>>>> | source_machine_name      | string              | NULL     |
>>>> | task_category            | string              | NULL     |
>>>> | user                     | string              | NULL     |
>>>> | additional_data          | map<string,string>  | NULL     |
>>>> | windows_event_time_bin   | timestamp           | NULL     |
>>>> | # Partition Information  |                     |          |
>>>> | # col_name               | data_type           | comment  |
>>>> | windows_event_time_bin   | timestamp           | NULL     |
>>>> +--------------------------+---------------------+----------+
>>>>
>>>>
>>>> However, when I create a partitioned table and write data using the
>>>> following:
>>>>
>>>>     hiveWindowsEvents.foreachRDD( rdd => {
>>>>       val eventsDataFrame = rdd.toDF()
>>>>
>>>> eventsDataFrame.write.mode(SaveMode.Append).saveAsTable("windows_event")
>>>>     })
>>>>
>>>> The data is written as though the table is not partitioned (so
>>>> everything is written to
>>>> /user/hive/warehouse/windows_event/file.gz.paquet.  Because the data is not
>>>> following the partition schema, it is not accessible (and not partitioned).
>>>>
>>>> Is there a straightforward way to write to partitioned tables using
>>>> Spark SQL?  I understand that the read performance for partitioned data is
>>>> far better - are there other performance improvements that might be better
>>>> to use instead of partitioning?
>>>>
>>>> Regards,
>>>>
>>>> Bryan Jeffrey
>>>>
>>>
>>>
>>
>

Reply via email to