Yana,

My basic use-case is that I want to process streaming data, and publish it to a 
persistent spark table. After that I want to make the published data (results) 
available via JDBC and spark SQL to drive a web API. That would seem to require 
two drivers starting separate HiveContexts (one for sparksql/jdbc, one for 
streaming)

Is there a way to share a hive context between the driver for the thrift spark 
SQL instance and the streaming spark driver? A better method to do this?

An alternate option might be to create the table in two separate metastores and 
simply use the same storage location for the data. That seems very hacky 
though, and likely to result in maintenance issues.

Regards,

Bryan Jeffrey

-----Original Message-----
From: "Yana Kadiyska" <yana.kadiy...@gmail.com>
Sent: ‎10/‎28/‎2015 8:32 PM
To: "Bryan Jeffrey" <bryan.jeff...@gmail.com>
Cc: "Susan Zhang" <suchenz...@gmail.com>; "user" <user@spark.apache.org>
Subject: Re: Spark -- Writing to Partitioned Persistent Table

For this issue in particular ( ERROR XSDB6: Another instance of Derby may have 
already booted the database /spark/spark-1.4.1/metastore_db) -- I think it 
depends on where you start your application and HiveThriftserver from. I've run 
into a similar issue running a driver app first, which would create a directory 
called metastore_db. If I then try to start SparkShell from the same directory, 
I will see this exception. So it is like SPARK-9776. It's not so much that the 
two are in the same process (as the bug resolution states) I think you can't 
run 2 drivers which start a HiveConext from the same directory.




On Wed, Oct 28, 2015 at 4:10 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote:

All,


One issue I'm seeing is that I start the thrift server (for jdbc access) via 
the following: /spark/spark-1.4.1/sbin/start-thriftserver.sh --master 
spark://master:7077 --hiveconf "spark.cores.max=2"


After about 40 seconds the Thrift server is started and available on default 
port 10000.


I then submit my application - and the application throws the following error:  


Caused by: java.sql.SQLException: Failed to start database 'metastore_db' with 
class loader 
org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@6a552721, see the 
next exception for details.
        at 
org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
        at 
org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
 Source)
        ... 86 more
Caused by: java.sql.SQLException: Another instance of Derby may have already 
booted the database /spark/spark-1.4.1/metastore_db.
        at 
org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
        at 
org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
 Source)
        at 
org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown Source)
        at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown 
Source)
        ... 83 more
Caused by: ERROR XSDB6: Another instance of Derby may have already booted the 
database /spark/spark-1.4.1/metastore_db.


This also happens if I do the opposite (submit the application first, and then 
start the thrift server).


It looks similar to the following issue -- but not quite the same: 
https://issues.apache.org/jira/browse/SPARK-9776


It seems like this set of steps works fine if the metadata database is not yet 
created - but once it's created this happens every time.  Is this a known 
issue? Is there a workaround?


Regards,


Bryan Jeffrey



On Wed, Oct 28, 2015 at 3:13 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote:

Susan,


I did give that a shot -- I'm seeing a number of oddities:


(1) 'Partition By' appears only accepts alphanumeric lower case fields.  It 
will work for 'machinename', but not 'machineName' or 'machine_name'.
(2) When partitioning with maps included in the data I get odd string 
conversion issues
(3) When partitioning without maps I see frequent out of memory issues


I'll update this email when I've got a more concrete example of problems.


Regards,


Bryan Jeffrey






On Wed, Oct 28, 2015 at 1:33 PM, Susan Zhang <suchenz...@gmail.com> wrote:

Have you tried partitionBy? 


Something like


hiveWindowsEvents.foreachRDD( rdd => {
      val eventsDataFrame = rdd.toDF()
      
eventsDataFrame.write.mode(SaveMode.Append).partitionBy("windows_event_time_bin").saveAsTable("windows_event")
    })







On Wed, Oct 28, 2015 at 7:41 AM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote:

Hello.


I am working to get a simple solution working using Spark SQL.  I am writing 
streaming data to persistent tables using a HiveContext.  Writing to a 
persistent non-partitioned table works well - I update the table using Spark 
streaming, and the output is available via Hive Thrift/JDBC.  


I create a table that looks like the following:


0: jdbc:hive2://localhost:10000> describe windows_event;
describe windows_event;
+--------------------------+---------------------+----------+
|         col_name         |      data_type      | comment  |
+--------------------------+---------------------+----------+
| target_entity            | string              | NULL     |
| target_entity_type       | string              | NULL     |
| date_time_utc            | timestamp           | NULL     |
| machine_ip               | string              | NULL     |
| event_id                 | string              | NULL     |
| event_data               | map<string,string>  | NULL     |
| description              | string              | NULL     |
| event_record_id          | string              | NULL     |
| level                    | string              | NULL     |
| machine_name             | string              | NULL     |
| sequence_number          | string              | NULL     |
| source                   | string              | NULL     |
| source_machine_name      | string              | NULL     |
| task_category            | string              | NULL     |
| user                     | string              | NULL     |
| additional_data          | map<string,string>  | NULL     |
| windows_event_time_bin   | timestamp           | NULL     |
| # Partition Information  |                     |          |
| # col_name               | data_type           | comment  |
| windows_event_time_bin   | timestamp           | NULL     |
+--------------------------+---------------------+----------+




However, when I create a partitioned table and write data using the following:


    hiveWindowsEvents.foreachRDD( rdd => {
      val eventsDataFrame = rdd.toDF()
      eventsDataFrame.write.mode(SaveMode.Append).saveAsTable("windows_event")
    })


The data is written as though the table is not partitioned (so everything is 
written to /user/hive/warehouse/windows_event/file.gz.paquet.  Because the data 
is not following the partition schema, it is not accessible (and not 
partitioned).


Is there a straightforward way to write to partitioned tables using Spark SQL?  
I understand that the read performance for partitioned data is far better - are 
there other performance improvements that might be better to use instead of 
partitioning?


Regards,


Bryan Jeffrey

Reply via email to