Will nested field performance improve?
Hello, I'm trying to make a call on whether my team should invest time added a step to "flatten" our schema as part of our ETL pipeline to improve performance of interactive queries. Our data start out life as Avro before being converted to Parquet, and so we follow the Avro idioms of creating our own types to reduce boilerplate in many areas. For example, every record we define has a "metadata" struct field with all the fields that are common to all records as part of the system design. Those fields are very common, and so virtually all queries need to access them. As a result, nearly all of our queries don't see the best performance we could be seeing in Spark SQL, etc. So my question - is this just inherently the way it is, or do we expect future releases will put them on a par with flat fields? The reason I ask is that I've actually seen similar differences in performance with Presto too. In benchmarks for both Spark and Presto, I generally see queries working on flat fields run 5-6x faster than queries doing the same thing on a nested field. If we expect fields nested in structs to always be much slower than flat fields, then I would be keen to address that in our ETL pipeline with a flattening step. If it's a known issue that we expect will be fixed in upcoming releases, I'll hold off. Any advice greatly appreciated! Thanks, James.
Unreachable dead objects permanently retained on heap
Hi, We have an application that submits several thousands jobs within the same SparkContext, using a thread pool to run about 50 in parallel. We're running on YARN using Spark 1.4.1 and seeing a problem where our driver is killed by YARN due to running beyond physical memory limits (no Java OOM stack trace though). Plugging in YourKit, I can see that in fact the application is running low on heap. The suspicious thing we're seeing is that the old generation is filling up with dead objects, which don't seem to be fully removed during the stop-the-world sweeps we see happening later in the running of the application. With allocation tracking enabled, I can see that maybe 80%+ of that dead heap space consists of byte arrays, which appear to contain some snappy-compressed Hadoop configuration data. Many of them are 4MB each, other hundreds of KBs. The allocation tracking reveals that they were originally allocated in calls to sparkContext.hadoopFile() (from AvroRelation in spark-avro). It seems that this data was broadcast to the executors as a result of that call? I'm not clear on the implementation details, but I can imagine that might be necessary? This application is essentially a batch job to take many Avro files and merging them into larger Parquet files. What it does is builds a DataFrame of Avro files, then for each DataFrame, starts a job using .coalesce(N).write().parquet() on a fixed size thread pool. It seems that for each of those calls, another chunk of heap space disappears to one of these byte arrays and is never reclaimed. I understand that broadcast variables remain in memory on the driver application in their serialized form, and that at least appears to be consistent with what I'm seeing here. Question is, what can we do about this? Is there a way to reclaim this memory? Should those arrays be GC'ed when jobs finish? Any guidance greatly appreciated. Many thanks, James.
Java UDFs in GROUP BY expressions
Hi everyone, I raised this JIRA ticket back in July: https://issues.apache.org/jira/browse/SPARK-9435 The problem is that it seems Spark SQL doesn't recognise columns we transform with a UDF when referenced in the GROUP BY clause. There's a minimal reproduction Java file attached to illustrate the issue. The equivalent code from Scala seems to work fine for me. Is anyone else seeing this problem? For us, the attached code fails every time on Spark 1.4.1 Thanks, James
Re: Help optimising Spark SQL query
Thanks everybody for the advice on this. I attached YourKit and found that the CPU time split was about 70% in Parquet/LZO reading and 30% applying the filter predicate. I guess those are reasonable things for it to be spending time on, and so it really could just be a case of needing more hardware to cope with that volume of rows. That's not such a problem, as the cluster wasn't exactly huge when testing - just a couple of nodes. Further, we've not been making use of the partitioning support for Parquet data, which would actually give us a simple way to control how much historical data to go sifting through. Turns out we're already writing our data as type/timestamp/parquet file, we just missed the date= naming convention - d'oh! At least that means a fairly simple rename script should get us out of trouble! Appreciate everyone's tips, thanks again! James. On 23 June 2015 at 17:25, Sabarish Sasidharan sabarish.sasidha...@manthan.com wrote: 64GB in parquet could be many billions of rows because of the columnar compression. And count distinct by itself is an expensive operation. This is not just on Spark, even on Presto/Impala, you would see performance dip with count distincts. And the cluster is not that powerful either. The one issue here is that Spark has to sift through all the data to get to just a week's worth. To achieve better performance you might want to partition the data by date/week and then Spark wouldn't have to sift through all the billions of rows to get to the millions it needs to aggregate. Regards Sab On Tue, Jun 23, 2015 at 4:35 PM, James Aley james.a...@swiftkey.com wrote: Thanks for the suggestions everyone, appreciate the advice. I tried replacing DISTINCT for the nested GROUP BY, running on 1.4 instead of 1.3, replacing the date casts with a between operation on the corresponding long constants instead and changing COUNT(*) to COUNT(1). None of these seem to have made any remarkable difference in running time for the query. I'll hook up YourKit and see if we can figure out where the CPU time is going, then post back. On 22 June 2015 at 16:01, Yin Huai yh...@databricks.com wrote: Hi James, Maybe it's the DISTINCT causing the issue. I rewrote the query as follows. Maybe this one can finish faster. select sum(cnt) as uses, count(id) as users from ( select count(*) cnt, cast(id as string) as id, from usage_events where from_unixtime(cast(timestamp_millis/1000 as bigint)) between '2015-06-09' and '2015-06-16' group by cast(id as string) ) tmp Thanks, Yin On Mon, Jun 22, 2015 at 12:55 PM, Jörn Franke jornfra...@gmail.com wrote: Generally (not only spark sql specific) you should not cast in the where part of a sql query. It is also not necessary in your case. Getting rid of casts in the whole query will be also beneficial. Le lun. 22 juin 2015 à 17:29, James Aley james.a...@swiftkey.com a écrit : Hello, A colleague of mine ran the following Spark SQL query: select count(*) as uses, count (distinct cast(id as string)) as users from usage_events where from_unixtime(cast(timestamp_millis/1000 as bigint)) between '2015-06-09' and '2015-06-16' The table contains billions of rows, but totals only 64GB of data across ~30 separate files, which are stored as Parquet with LZO compression in S3. From the referenced columns: * id is Binary, which we cast to a String so that we can DISTINCT by it. (I was already told this will improve in a later release, in a separate thread.) * timestamp_millis is a long, containing a unix timestamp with millisecond resolution This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2 instances, using 20 executors, each with 4GB memory. I can see from monitoring tools that the CPU usage is at 100% on all nodes, but incoming network seems a bit low at 2.5MB/s, suggesting to me that this is CPU-bound. Does that seem slow? Can anyone offer any ideas by glancing at the query as to why this might be slow? We'll profile it meanwhile and post back if we find anything ourselves. A side issue - I've found that this query, and others, sometimes completes but doesn't return any results. There appears to be no error that I can see in the logs, and Spark reports the job as successful, but the connected JDBC client (SQLWorkbenchJ in this case), just sits there forever waiting. I did a quick Google and couldn't find anyone else having similar issues. Many thanks, James. -- Architect - Big Data Ph: +91 99805 99458 Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan India ICT)* +++
Re: Help optimising Spark SQL query
Thanks for the suggestions everyone, appreciate the advice. I tried replacing DISTINCT for the nested GROUP BY, running on 1.4 instead of 1.3, replacing the date casts with a between operation on the corresponding long constants instead and changing COUNT(*) to COUNT(1). None of these seem to have made any remarkable difference in running time for the query. I'll hook up YourKit and see if we can figure out where the CPU time is going, then post back. On 22 June 2015 at 16:01, Yin Huai yh...@databricks.com wrote: Hi James, Maybe it's the DISTINCT causing the issue. I rewrote the query as follows. Maybe this one can finish faster. select sum(cnt) as uses, count(id) as users from ( select count(*) cnt, cast(id as string) as id, from usage_events where from_unixtime(cast(timestamp_millis/1000 as bigint)) between '2015-06-09' and '2015-06-16' group by cast(id as string) ) tmp Thanks, Yin On Mon, Jun 22, 2015 at 12:55 PM, Jörn Franke jornfra...@gmail.com wrote: Generally (not only spark sql specific) you should not cast in the where part of a sql query. It is also not necessary in your case. Getting rid of casts in the whole query will be also beneficial. Le lun. 22 juin 2015 à 17:29, James Aley james.a...@swiftkey.com a écrit : Hello, A colleague of mine ran the following Spark SQL query: select count(*) as uses, count (distinct cast(id as string)) as users from usage_events where from_unixtime(cast(timestamp_millis/1000 as bigint)) between '2015-06-09' and '2015-06-16' The table contains billions of rows, but totals only 64GB of data across ~30 separate files, which are stored as Parquet with LZO compression in S3. From the referenced columns: * id is Binary, which we cast to a String so that we can DISTINCT by it. (I was already told this will improve in a later release, in a separate thread.) * timestamp_millis is a long, containing a unix timestamp with millisecond resolution This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2 instances, using 20 executors, each with 4GB memory. I can see from monitoring tools that the CPU usage is at 100% on all nodes, but incoming network seems a bit low at 2.5MB/s, suggesting to me that this is CPU-bound. Does that seem slow? Can anyone offer any ideas by glancing at the query as to why this might be slow? We'll profile it meanwhile and post back if we find anything ourselves. A side issue - I've found that this query, and others, sometimes completes but doesn't return any results. There appears to be no error that I can see in the logs, and Spark reports the job as successful, but the connected JDBC client (SQLWorkbenchJ in this case), just sits there forever waiting. I did a quick Google and couldn't find anyone else having similar issues. Many thanks, James.
Help optimising Spark SQL query
Hello, A colleague of mine ran the following Spark SQL query: select count(*) as uses, count (distinct cast(id as string)) as users from usage_events where from_unixtime(cast(timestamp_millis/1000 as bigint)) between '2015-06-09' and '2015-06-16' The table contains billions of rows, but totals only 64GB of data across ~30 separate files, which are stored as Parquet with LZO compression in S3. From the referenced columns: * id is Binary, which we cast to a String so that we can DISTINCT by it. (I was already told this will improve in a later release, in a separate thread.) * timestamp_millis is a long, containing a unix timestamp with millisecond resolution This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2 instances, using 20 executors, each with 4GB memory. I can see from monitoring tools that the CPU usage is at 100% on all nodes, but incoming network seems a bit low at 2.5MB/s, suggesting to me that this is CPU-bound. Does that seem slow? Can anyone offer any ideas by glancing at the query as to why this might be slow? We'll profile it meanwhile and post back if we find anything ourselves. A side issue - I've found that this query, and others, sometimes completes but doesn't return any results. There appears to be no error that I can see in the logs, and Spark reports the job as successful, but the connected JDBC client (SQLWorkbenchJ in this case), just sits there forever waiting. I did a quick Google and couldn't find anyone else having similar issues. Many thanks, James.
Re: Help optimising Spark SQL query
Thanks for the responses, guys! Sorry, I forgot to mention that I'm using Spark 1.3.0, but I'll test with 1.4.0 and try the codegen suggestion then report back. On 22 June 2015 at 12:37, Matthew Johnson matt.john...@algomi.com wrote: Hi James, What version of Spark are you using? In Spark 1.2.2 I had an issue where Spark would report a job as complete but I couldn’t find my results anywhere – I just assumed it was me doing something wrong as I am still quite new to Spark. However, since upgrading to 1.4.0 I have not seen this issue, so might be worth upgrading if you are not already on 1.4. Cheers, Matthew *From:* Lior Chaga [mailto:lio...@taboola.com] *Sent:* 22 June 2015 17:24 *To:* James Aley *Cc:* user *Subject:* Re: Help optimising Spark SQL query Hi James, There are a few configurations that you can try: https://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options From my experience, the codegen really boost things up. Just run sqlContext.sql(spark.sql.codegen=true) before you execute your query. But keep in mind that sometimes this is buggy (depending on your query), so compare to results without codegen to be sure. Also you can try changing the default partitions. You can also use dataframes (since 1.3). Not sure they are better than specifying the query in 1.3, but with spark 1.4 there should be an enormous performance improvement in dataframes. Lior On Mon, Jun 22, 2015 at 6:28 PM, James Aley james.a...@swiftkey.com wrote: Hello, A colleague of mine ran the following Spark SQL query: select count(*) as uses, count (distinct cast(id as string)) as users from usage_events where from_unixtime(cast(timestamp_millis/1000 as bigint)) between '2015-06-09' and '2015-06-16' The table contains billions of rows, but totals only 64GB of data across ~30 separate files, which are stored as Parquet with LZO compression in S3. From the referenced columns: * id is Binary, which we cast to a String so that we can DISTINCT by it. (I was already told this will improve in a later release, in a separate thread.) * timestamp_millis is a long, containing a unix timestamp with millisecond resolution This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2 instances, using 20 executors, each with 4GB memory. I can see from monitoring tools that the CPU usage is at 100% on all nodes, but incoming network seems a bit low at 2.5MB/s, suggesting to me that this is CPU-bound. Does that seem slow? Can anyone offer any ideas by glancing at the query as to why this might be slow? We'll profile it meanwhile and post back if we find anything ourselves. A side issue - I've found that this query, and others, sometimes completes but doesn't return any results. There appears to be no error that I can see in the logs, and Spark reports the job as successful, but the connected JDBC client (SQLWorkbenchJ in this case), just sits there forever waiting. I did a quick Google and couldn't find anyone else having similar issues. Many thanks, James.
Re: Optimisation advice for Avro-Parquet merge job
Hey Kiran, Thanks very much for the response. I left for vacation before I could try this out, but I'll experiment once I get back and let you know how it goes. Thanks! James. On 8 June 2015 at 12:34, kiran lonikar loni...@gmail.com wrote: It turns out my assumption on load and unionAll being blocking is not correct. They are transformations. So instead of just running only the load and unionAll in the run() methods, I think you will have to save the intermediate dfInput[i] to temp (parquet) files (possibly to in memory DFS like http://tachyon-project.org/) in the run() methods. The second for loop will also have to load from the intermediate parquet files. Then finally save the final dfInput[0] to the HDFS. I think this way of parallelizing will force the cluster to utilize the all the resources. -Kiran On Mon, Jun 8, 2015 at 12:30 PM, kiran lonikar loni...@gmail.com wrote: James, As I can see, there are three distinct parts to your program: - for loop - synchronized block - final outputFrame.save statement Can you do a separate timing measurement by putting a simple System.currentTimeMillis() around these blocks to know how much they are taking and then try to optimize where it takes longest? In the second block, you may want to measure the time for the two statements. Improving this boils down to playing with spark settings. Now consider the first block: I think this is a classic case of merge sort or a reduce tree. You already tried to improve this by submitting jobs in parallel using executor pool/Callable etc. To further improve the parallelization, I suggest you use a reduce tree like approach. For example, lets say you want to compute sum of all elements of an array in parallel. The way its solved for a GPU like platform is you divide your input array initially in chunks of 2, compute those n/2 sums parallely on separate threads and save the result in the first of the two elements. In the next iteration, you compute n/4 sums parallely of the earlier sums and so on till you are left with only two elements whose sum gives you final sum. You are performing many sequential unionAll operations for inputs.size() avro files. Assuming the unionAll() on DataFrame is blocking (and not a simple transformation like on RDDs) and actually performs the union operation, you will certainly benefit by parallelizing this loop. You may change the loop to something like below: // pseudo code only int n = inputs.size() // initialize executor executor = new FixedThreadPoolExecutor(n/2) dfInput = new DataFrame[n/2] for(int i =0;i n/2;i++) { executor.submit(new Runnable() { public void run() { // union of i and i+n/2 // showing [] only to bring out array access. Replace with dfInput(i) in your code dfInput[i] = sqlContext.load(inputPaths.get(i), com.databricks.spark.avro).unionAll(sqlContext.load(inputsPath.get(i + n/2), com.databricks.spark.avro)) } }); } executor.awaitTermination(0, TimeUnit.SECONDS) int steps = log(n)/log(2.0) for(s = 2; s steps;s++) { int stride = n/(1 s); // n/(2^s) for(int i = 0;i stride;i++) { executor.submit(new Runnable() { public void run() { // union of i and i+n/2 // showing [] only to bring out array access. Replace with dfInput(i) and dfInput(i+stride) in your code dfInput[i] = dfInput[i].unionAll(dfInput[i + stride]) } }); } executor.awaitTermination(0, TimeUnit.SECONDS) } Let me know if it helped. -Kiran On Thu, Jun 4, 2015 at 8:57 PM, James Aley james.a...@swiftkey.com wrote: Thanks for the confirmation! We're quite new to Spark, so a little reassurance is a good thing to have sometimes :-) The thing that's concerning me at the moment is that my job doesn't seem to run any faster with more compute resources added to the cluster, and this is proving a little tricky to debug. There are a lot of variables, so here's what we've tried already and the apparent impact. If anyone has any further suggestions, we'd love to hear! * Increase the minimum number of output files (targetPartitions above), so that input groups smaller than our minimum chunk size can still be worked on by more than one executor. This does measurably speed things up, but obviously it's a trade-off, as the original goal for this job is to merge our data into fewer, larger files. * Submit many jobs in parallel, by running the above code in a Callable, on an executor pool. This seems to help, to some extent, but I'm not sure what else needs to be configured alongside it -- driver threads, scheduling policy, etc. We set scheduling to FAIR when doing this, as that seemed like the right approach, but we're not 100% confident. It seemed to help quite substantially anyway, so perhaps this just needs further tuning? * Increasing executors
Re: Optimisation advice for Avro-Parquet merge job
Thanks for the confirmation! We're quite new to Spark, so a little reassurance is a good thing to have sometimes :-) The thing that's concerning me at the moment is that my job doesn't seem to run any faster with more compute resources added to the cluster, and this is proving a little tricky to debug. There are a lot of variables, so here's what we've tried already and the apparent impact. If anyone has any further suggestions, we'd love to hear! * Increase the minimum number of output files (targetPartitions above), so that input groups smaller than our minimum chunk size can still be worked on by more than one executor. This does measurably speed things up, but obviously it's a trade-off, as the original goal for this job is to merge our data into fewer, larger files. * Submit many jobs in parallel, by running the above code in a Callable, on an executor pool. This seems to help, to some extent, but I'm not sure what else needs to be configured alongside it -- driver threads, scheduling policy, etc. We set scheduling to FAIR when doing this, as that seemed like the right approach, but we're not 100% confident. It seemed to help quite substantially anyway, so perhaps this just needs further tuning? * Increasing executors, RAM, etc. This doesn't make a difference by itself for this job, so I'm thinking we're already not fully utilising the resources we have in a smaller cluster. Again, any recommendations appreciated. Thanks for the help! James. On 4 June 2015 at 15:00, Eugen Cepoi cepoi.eu...@gmail.com wrote: Hi 2015-06-04 15:29 GMT+02:00 James Aley james.a...@swiftkey.com: Hi, We have a load of Avro data coming into our data systems in the form of relatively small files, which we're merging into larger Parquet files with Spark. I've been following the docs and the approach I'm taking seemed fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's not the most optimal approach. I was wondering if anyone on this list might have some advice to make to make this job as efficient as possible. Here's some code: DataFrame dfInput = sqlContext.load(inputPaths.get(0), com.databricks.spark.avro); long totalSize = getDirSize(inputPaths.get(0)); for (int i = 1; i inputs.size(); ++i) { dfInput = dfInput.unionAll(sqlContext.load(inputPaths.get(i), com.databricks.spark.avro)); totalSize += getDirSize(inputPaths.get(i)); } int targetPartitions = (int) Math.max(2L, totalSize / TARGET_SIZE_BYTES); DataFrame outputFrame; // Note: HADOOP-10456 impacts us, as we're stuck on 2.4.0 in EMR, hence // the synchronize block below. Suggestions welcome here too! :-) synchronized (this) { RDDRow inputRdd = dfInput.rdd().coalesce(targetPartitions, false, null); outputFrame = sqlContext.createDataFrame(inputRdd, dfInput.schema()); } outputFrame.save(outputPath, parquet, SaveMode.Overwrite); Here are some things bothering me: - Conversion to an RDD and back again so that we can use coalesce() to reduce the number of partitions. This is because we read that repartition() is not as efficient as coalesce(), and local micro benchmarks seemed to somewhat confirm that this was faster. Is this really a good idea though? Should we be doing something else? Repartition uses coalesce but with a forced shuffle step. Its just a shortcut for coalesce(xxx, true) Doing a coalesce sounds correct, I'd do the same :) Note that if you add the shuffle step, then your partitions should be better balanced. - Usage of unionAll() - this is the only way I could find to join the separate data sets into a single data frame to save as Parquet. Is there a better way? When using directly the inputformats you can do this FileInputFormat.addInputPath, it should perform at least as good as union. - Do I need to be using the DataFrame API at all? I'm not querying any data here, so the nice API for SQL-like transformations of the data isn't being used. The DataFrame API just seemed like the path of least resistance for working with Avro and Parquet. Would there be any advantage to using hadoopRDD() with the appropriate Input/Output formats? Using directly the input/outputformats sounds viable. But the snippet you show seems clean enough and I am not sure there would be much value in making something (maybe) slightly faster but harder to understand. Eugen Any advice or tips greatly appreciated! James.
Optimisation advice for Avro-Parquet merge job
Hi, We have a load of Avro data coming into our data systems in the form of relatively small files, which we're merging into larger Parquet files with Spark. I've been following the docs and the approach I'm taking seemed fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's not the most optimal approach. I was wondering if anyone on this list might have some advice to make to make this job as efficient as possible. Here's some code: DataFrame dfInput = sqlContext.load(inputPaths.get(0), com.databricks.spark.avro); long totalSize = getDirSize(inputPaths.get(0)); for (int i = 1; i inputs.size(); ++i) { dfInput = dfInput.unionAll(sqlContext.load(inputPaths.get(i), com.databricks.spark.avro)); totalSize += getDirSize(inputPaths.get(i)); } int targetPartitions = (int) Math.max(2L, totalSize / TARGET_SIZE_BYTES); DataFrame outputFrame; // Note: HADOOP-10456 impacts us, as we're stuck on 2.4.0 in EMR, hence // the synchronize block below. Suggestions welcome here too! :-) synchronized (this) { RDDRow inputRdd = dfInput.rdd().coalesce(targetPartitions, false, null); outputFrame = sqlContext.createDataFrame(inputRdd, dfInput.schema()); } outputFrame.save(outputPath, parquet, SaveMode.Overwrite); Here are some things bothering me: - Conversion to an RDD and back again so that we can use coalesce() to reduce the number of partitions. This is because we read that repartition() is not as efficient as coalesce(), and local micro benchmarks seemed to somewhat confirm that this was faster. Is this really a good idea though? Should we be doing something else? - Usage of unionAll() - this is the only way I could find to join the separate data sets into a single data frame to save as Parquet. Is there a better way? - Do I need to be using the DataFrame API at all? I'm not querying any data here, so the nice API for SQL-like transformations of the data isn't being used. The DataFrame API just seemed like the path of least resistance for working with Avro and Parquet. Would there be any advantage to using hadoopRDD() with the appropriate Input/Output formats? Any advice or tips greatly appreciated! James.
[Spark SQL] Problems creating a table in specified schema/database
Hey all, I'm trying to create tables from existing Parquet data in different schemata. The following isn't working for me: CREATE DATABASE foo; CREATE TABLE foo.bar USING com.databricks.spark.avro OPTIONS (path '...'); -- Error: org.apache.spark.sql.AnalysisException: cannot recognize input near 'USING' 'com' '.' in table name; line 1 pos 13 (state=,code=0) I also tried USE foo; CREATE TABLE bar USING com.databricks.spark.avro OPTIONS (path '...'); -- Creates the table successfully, but in the default.* schema. This is on Spark 1.3.1, running on YARN, Hive 0.13.1. Any suggestions? Should this work? James.
Advice using Spark SQL and Thrift JDBC Server
Hello, First of all, thank you to everyone working on Spark. I've only been using it for a few weeks now but so far I'm really enjoying it. You saved me from a big, scary elephant! :-) I was wondering if anyone might be able to offer some advice about working with the Thrift JDBC server? I'm trying to enable members of my team to connect and run some basic SQL queries on a Spark cluster using their favourite JDBC tools. Following the docs [1], I've managed to get something simple up and running but I'd really appreciate it if someone can validate my understanding here, as the docs don't go deeply into the details. Here are a few questions I've not been able to find answers to myself: 1) What exactly is the relationship between the thrift server and Hive? I'm guessing Spark is just making use of the Hive metastore to access table definitions, and maybe some other things, is that the case? 2) Am I therefore right in thinking that SQL queries sent to the thrift server are still executed on the Spark cluster, using Spark SQL, and Hive plays no active part in computation of results? 3) What SQL flavour is actually supported by the Thrift Server? Is it Spark SQL, Hive, or both? I've confused, because I've seen it accepting Hive CREATE TABLE syntax, but Spark SQL seems to work too? 4) When I run SQL queries using the Scala or Python shells, Spark seems to figure out the schema by itself from my Parquet files very well, if I use createTempTable on the DataFrame. It seems when running the thrift server, I need to create a Hive table definition first? Is that the case, or did I miss something? If it is, is there some sensible way to automate this? Many thanks! James [1] https://spark.apache.org/docs/latest/sql-programming-guide.html#running-the-thrift-jdbcodbc-server
Re: Advice using Spark SQL and Thrift JDBC Server
Hi Michael, Thanks so much for the reply - that really cleared a lot of things up for me! Let me just check that I've interpreted one of your suggestions for (4) correctly... Would it make sense for me to write a small wrapper app that pulls in hive-thriftserver as a dependency, iterates my Parquet directory structure to discover tables and registers each as a temp table in some context, before calling HiveThriftServer2.createWithContext as you suggest? This would mean that to add new content, all I need to is restart that app, which presumably could also be avoided fairly trivially by periodically restarting the server with a new context internally. That certainly beats manual curation of Hive table definitions, if it will work? Thanks again, James. On 7 April 2015 at 19:30, Michael Armbrust mich...@databricks.com wrote: 1) What exactly is the relationship between the thrift server and Hive? I'm guessing Spark is just making use of the Hive metastore to access table definitions, and maybe some other things, is that the case? Underneath the covers, the Spark SQL thrift server is executing queries using a HiveContext. In this mode, nearly all computation is done with Spark SQL but we try to maintain compatibility with Hive wherever possible. This means that you can write your queries in HiveQL, read tables from the Hive metastore, and use Hive UDFs UDTs UDAFs, etc. The one exception here is Hive DDL operations (CREATE TABLE, etc). These are passed directly to Hive code and executed there. The Spark SQL DDL is sufficiently different that we always try to parse that first, and fall back to Hive when it does not parse. One possibly confusing point here, is that you can persist Spark SQL tables into the Hive metastore, but this is not the same as a Hive table. We are only use the metastore as a repo for metadata, but are not using their format for the information in this case (as we have datasources that hive does not understand, including things like schema auto discovery). HiveQL DDL, run by Hive but can be read by Spark SQL: CREATE TABLE t (x INT) SORTED AS PARQUET Spark SQL DDL, run by Spark SQL, stored in metastore, cannot be read by hive: CREATE TABLE t USING parquet (path '/path/to/data') 2) Am I therefore right in thinking that SQL queries sent to the thrift server are still executed on the Spark cluster, using Spark SQL, and Hive plays no active part in computation of results? Correct. 3) What SQL flavour is actually supported by the Thrift Server? Is it Spark SQL, Hive, or both? I've confused, because I've seen it accepting Hive CREATE TABLE syntax, but Spark SQL seems to work too? HiveQL++ (with Spark SQL DDL). You can make it use our simple SQL parser by `SET spark.sql.dialect=sql`, but honestly you probably don't want to do this. The included SQL parser is mostly there for people who have dependency conflicts with Hive. 4) When I run SQL queries using the Scala or Python shells, Spark seems to figure out the schema by itself from my Parquet files very well, if I use createTempTable on the DataFrame. It seems when running the thrift server, I need to create a Hive table definition first? Is that the case, or did I miss something? If it is, is there some sensible way to automate this? Temporary tables are only visible to the SQLContext that creates them. If you want it to be visible to the server, you need to either start the thrift server with the same context your program is using (see HiveThriftServer2.createWithContext) or make a metastore table. This can be done using Spark SQL DDL: CREATE TABLE t USING parquet (path '/path/to/data') Michael