Will nested field performance improve?

2016-04-15 Thread James Aley
Hello,

I'm trying to make a call on whether my team should invest time added a
step to "flatten" our schema as part of our ETL pipeline to improve
performance of interactive queries.

Our data start out life as Avro before being converted to Parquet, and so
we follow the Avro idioms of creating our own types to reduce boilerplate
in many areas. For example, every record we define has a "metadata" struct
field with all the fields that are common to all records as part of the
system design. Those fields are very common, and so virtually all queries
need to access them. As a result, nearly all of our queries don't see the
best performance we could be seeing in Spark SQL, etc.

So my question - is this just inherently the way it is, or do we expect
future releases will put them on a par with flat fields? The reason I ask
is that I've actually seen similar differences in performance with Presto
too. In benchmarks for both Spark and Presto, I generally see queries
working on flat fields run 5-6x faster than queries doing the same thing on
a nested field.

If we expect fields nested in structs to always be much slower than flat
fields, then I would be keen to address that in our ETL pipeline with a
flattening step. If it's a known issue that we expect will be fixed in
upcoming releases, I'll hold off.

Any advice greatly appreciated!

Thanks,

James.


Unreachable dead objects permanently retained on heap

2015-09-25 Thread James Aley
Hi,

We have an application that submits several thousands jobs within the same
SparkContext, using a thread pool to run about 50 in parallel. We're
running on YARN using Spark 1.4.1 and seeing a problem where our driver is
killed by YARN due to running beyond physical memory limits (no Java OOM
stack trace though).

Plugging in YourKit, I can see that in fact the application is running low
on heap. The suspicious thing we're seeing is that the old generation is
filling up with dead objects, which don't seem to be fully removed during
the stop-the-world sweeps we see happening later in the running of the
application.

With allocation tracking enabled, I can see that maybe 80%+ of that dead
heap space consists of byte arrays, which appear to contain some
snappy-compressed Hadoop configuration data. Many of them are 4MB each,
other hundreds of KBs. The allocation tracking reveals that they were
originally allocated in calls to sparkContext.hadoopFile() (from
AvroRelation in spark-avro). It seems that this data was broadcast to the
executors as a result of that call? I'm not clear on the implementation
details, but I can imagine that might be necessary?

This application is essentially a batch job to take many Avro files and
merging them into larger Parquet files. What it does is builds a DataFrame
of Avro files, then for each DataFrame, starts a job using
.coalesce(N).write().parquet() on a fixed size thread pool.

It seems that for each of those calls, another chunk of heap space
disappears to one of these byte arrays and is never reclaimed. I understand
that broadcast variables remain in memory on the driver application in
their serialized form, and that at least appears to be consistent with what
I'm seeing here. Question is, what can we do about this? Is there a way to
reclaim this memory? Should those arrays be GC'ed when jobs finish?

Any guidance greatly appreciated.


Many thanks,

James.


Java UDFs in GROUP BY expressions

2015-09-07 Thread James Aley
Hi everyone,

I raised this JIRA ticket back in July:
https://issues.apache.org/jira/browse/SPARK-9435

The problem is that it seems Spark SQL doesn't recognise columns we
transform with a UDF when referenced in the GROUP BY clause. There's a
minimal reproduction Java file attached to illustrate the issue.

The equivalent code from Scala seems to work fine for me. Is anyone else
seeing this problem? For us, the attached code fails every time on Spark
1.4.1


Thanks,

James


Re: Help optimising Spark SQL query

2015-06-30 Thread James Aley
Thanks everybody for the advice on this.

I attached YourKit and found that the CPU time split was about 70% in
Parquet/LZO reading and 30% applying the filter predicate. I guess those
are reasonable things for it to be spending time on, and so it really could
just be a case of needing more hardware to cope with that volume of rows.
That's not such a problem, as the cluster wasn't exactly huge when testing
- just a couple of nodes.

Further, we've not been making use of the partitioning support for Parquet
data, which would actually give us a simple way to control how much
historical data to go sifting through. Turns out we're already writing our
data as type/timestamp/parquet file, we just missed the date=
naming convention - d'oh! At least that means a fairly simple rename script
should get us out of trouble!

Appreciate everyone's tips, thanks again!

James.


On 23 June 2015 at 17:25, Sabarish Sasidharan 
sabarish.sasidha...@manthan.com wrote:

 64GB in parquet could be many billions of rows because of the columnar
 compression. And count distinct by itself is an expensive operation. This
 is not just on Spark, even on Presto/Impala, you would see performance dip
 with count distincts. And the cluster is not that powerful either.

 The one issue here is that Spark has to sift through all the data to get
 to just a week's worth. To achieve better performance you might want to
 partition the data by date/week and then Spark wouldn't have to sift
 through all the billions of rows to get to the millions it needs to
 aggregate.

 Regards
 Sab

 On Tue, Jun 23, 2015 at 4:35 PM, James Aley james.a...@swiftkey.com
 wrote:

 Thanks for the suggestions everyone, appreciate the advice.

 I tried replacing DISTINCT for the nested GROUP BY, running on 1.4
 instead of 1.3, replacing the date casts with a between operation on the
 corresponding long constants instead and changing COUNT(*) to COUNT(1).
 None of these seem to have made any remarkable difference in running time
 for the query.

 I'll hook up YourKit and see if we can figure out where the CPU time is
 going, then post back.

 On 22 June 2015 at 16:01, Yin Huai yh...@databricks.com wrote:

 Hi James,

 Maybe it's the DISTINCT causing the issue.

 I rewrote the query as follows. Maybe this one can finish faster.

 select
   sum(cnt) as uses,
   count(id) as users
 from (
   select
 count(*) cnt,
 cast(id as string) as id,
   from usage_events
   where
 from_unixtime(cast(timestamp_millis/1000 as bigint)) between
 '2015-06-09' and '2015-06-16'
   group by cast(id as string)
 ) tmp

 Thanks,

 Yin

 On Mon, Jun 22, 2015 at 12:55 PM, Jörn Franke jornfra...@gmail.com
 wrote:

 Generally (not only spark sql specific) you should not cast in the
 where part of a sql query. It is also not necessary in your case. Getting
 rid of casts in the whole query will be also beneficial.

 Le lun. 22 juin 2015 à 17:29, James Aley james.a...@swiftkey.com a
 écrit :

 Hello,

 A colleague of mine ran the following Spark SQL query:

 select
   count(*) as uses,
   count (distinct cast(id as string)) as users
 from usage_events
 where
   from_unixtime(cast(timestamp_millis/1000 as bigint))
 between '2015-06-09' and '2015-06-16'

 The table contains billions of rows, but totals only 64GB of data
 across ~30 separate files, which are stored as Parquet with LZO 
 compression
 in S3.

 From the referenced columns:

 * id is Binary, which we cast to a String so that we can DISTINCT by
 it. (I was already told this will improve in a later release, in a 
 separate
 thread.)
 * timestamp_millis is a long, containing a unix timestamp with
 millisecond resolution

 This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2
 instances, using 20 executors, each with 4GB memory. I can see from
 monitoring tools that the CPU usage is at 100% on all nodes, but incoming
 network seems a bit low at 2.5MB/s, suggesting to me that this is 
 CPU-bound.

 Does that seem slow? Can anyone offer any ideas by glancing at the
 query as to why this might be slow? We'll profile it meanwhile and post
 back if we find anything ourselves.

 A side issue - I've found that this query, and others, sometimes
 completes but doesn't return any results. There appears to be no error 
 that
 I can see in the logs, and Spark reports the job as successful, but the
 connected JDBC client (SQLWorkbenchJ in this case), just sits there 
 forever
 waiting. I did a quick Google and couldn't find anyone else having similar
 issues.


 Many thanks,

 James.






 --

 Architect - Big Data
 Ph: +91 99805 99458

 Manthan Systems | *Company of the year - Analytics (2014 Frost and
 Sullivan India ICT)*
 +++



Re: Help optimising Spark SQL query

2015-06-23 Thread James Aley
Thanks for the suggestions everyone, appreciate the advice.

I tried replacing DISTINCT for the nested GROUP BY, running on 1.4 instead
of 1.3, replacing the date casts with a between operation on the
corresponding long constants instead and changing COUNT(*) to COUNT(1).
None of these seem to have made any remarkable difference in running time
for the query.

I'll hook up YourKit and see if we can figure out where the CPU time is
going, then post back.

On 22 June 2015 at 16:01, Yin Huai yh...@databricks.com wrote:

 Hi James,

 Maybe it's the DISTINCT causing the issue.

 I rewrote the query as follows. Maybe this one can finish faster.

 select
   sum(cnt) as uses,
   count(id) as users
 from (
   select
 count(*) cnt,
 cast(id as string) as id,
   from usage_events
   where
 from_unixtime(cast(timestamp_millis/1000 as bigint)) between
 '2015-06-09' and '2015-06-16'
   group by cast(id as string)
 ) tmp

 Thanks,

 Yin

 On Mon, Jun 22, 2015 at 12:55 PM, Jörn Franke jornfra...@gmail.com
 wrote:

 Generally (not only spark sql specific) you should not cast in the where
 part of a sql query. It is also not necessary in your case. Getting rid of
 casts in the whole query will be also beneficial.

 Le lun. 22 juin 2015 à 17:29, James Aley james.a...@swiftkey.com a
 écrit :

 Hello,

 A colleague of mine ran the following Spark SQL query:

 select
   count(*) as uses,
   count (distinct cast(id as string)) as users
 from usage_events
 where
   from_unixtime(cast(timestamp_millis/1000 as bigint))
 between '2015-06-09' and '2015-06-16'

 The table contains billions of rows, but totals only 64GB of data across
 ~30 separate files, which are stored as Parquet with LZO compression in S3.

 From the referenced columns:

 * id is Binary, which we cast to a String so that we can DISTINCT by
 it. (I was already told this will improve in a later release, in a separate
 thread.)
 * timestamp_millis is a long, containing a unix timestamp with
 millisecond resolution

 This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2
 instances, using 20 executors, each with 4GB memory. I can see from
 monitoring tools that the CPU usage is at 100% on all nodes, but incoming
 network seems a bit low at 2.5MB/s, suggesting to me that this is CPU-bound.

 Does that seem slow? Can anyone offer any ideas by glancing at the query
 as to why this might be slow? We'll profile it meanwhile and post back if
 we find anything ourselves.

 A side issue - I've found that this query, and others, sometimes
 completes but doesn't return any results. There appears to be no error that
 I can see in the logs, and Spark reports the job as successful, but the
 connected JDBC client (SQLWorkbenchJ in this case), just sits there forever
 waiting. I did a quick Google and couldn't find anyone else having similar
 issues.


 Many thanks,

 James.





Help optimising Spark SQL query

2015-06-22 Thread James Aley
Hello,

A colleague of mine ran the following Spark SQL query:

select
  count(*) as uses,
  count (distinct cast(id as string)) as users
from usage_events
where
  from_unixtime(cast(timestamp_millis/1000 as bigint))
between '2015-06-09' and '2015-06-16'

The table contains billions of rows, but totals only 64GB of data across
~30 separate files, which are stored as Parquet with LZO compression in S3.

From the referenced columns:

* id is Binary, which we cast to a String so that we can DISTINCT by it. (I
was already told this will improve in a later release, in a separate
thread.)
* timestamp_millis is a long, containing a unix timestamp with millisecond
resolution

This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2
instances, using 20 executors, each with 4GB memory. I can see from
monitoring tools that the CPU usage is at 100% on all nodes, but incoming
network seems a bit low at 2.5MB/s, suggesting to me that this is CPU-bound.

Does that seem slow? Can anyone offer any ideas by glancing at the query as
to why this might be slow? We'll profile it meanwhile and post back if we
find anything ourselves.

A side issue - I've found that this query, and others, sometimes completes
but doesn't return any results. There appears to be no error that I can see
in the logs, and Spark reports the job as successful, but the connected
JDBC client (SQLWorkbenchJ in this case), just sits there forever waiting.
I did a quick Google and couldn't find anyone else having similar issues.


Many thanks,

James.


Re: Help optimising Spark SQL query

2015-06-22 Thread James Aley
Thanks for the responses, guys!

Sorry, I forgot to mention that I'm using Spark 1.3.0, but I'll test with
1.4.0 and try the codegen suggestion then report back.


On 22 June 2015 at 12:37, Matthew Johnson matt.john...@algomi.com wrote:

 Hi James,



 What version of Spark are you using? In Spark 1.2.2 I had an issue where
 Spark would report a job as complete but I couldn’t find my results
 anywhere – I just assumed it was me doing something wrong as I am still
 quite new to Spark. However, since upgrading to 1.4.0 I have not seen this
 issue, so might be worth upgrading if you are not already on 1.4.



 Cheers,

 Matthew





 *From:* Lior Chaga [mailto:lio...@taboola.com]
 *Sent:* 22 June 2015 17:24
 *To:* James Aley
 *Cc:* user
 *Subject:* Re: Help optimising Spark SQL query



 Hi James,



 There are a few configurations that you can try:


 https://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options



 From my experience, the codegen really boost things up. Just run
 sqlContext.sql(spark.sql.codegen=true) before you execute your query. But
 keep in mind that sometimes this is buggy (depending on your query), so
 compare to results without codegen to be sure.

 Also you can try changing the default partitions.



 You can also use dataframes (since 1.3). Not sure they are better than
 specifying the query in 1.3, but with spark 1.4 there should be an enormous
 performance improvement in dataframes.



 Lior



 On Mon, Jun 22, 2015 at 6:28 PM, James Aley james.a...@swiftkey.com
 wrote:

 Hello,



 A colleague of mine ran the following Spark SQL query:



 select

   count(*) as uses,

   count (distinct cast(id as string)) as users

 from usage_events

 where

   from_unixtime(cast(timestamp_millis/1000 as bigint))

 between '2015-06-09' and '2015-06-16'



 The table contains billions of rows, but totals only 64GB of data across
 ~30 separate files, which are stored as Parquet with LZO compression in S3.



 From the referenced columns:



 * id is Binary, which we cast to a String so that we can DISTINCT by it.
 (I was already told this will improve in a later release, in a separate
 thread.)

 * timestamp_millis is a long, containing a unix timestamp with
 millisecond resolution



 This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2
 instances, using 20 executors, each with 4GB memory. I can see from
 monitoring tools that the CPU usage is at 100% on all nodes, but incoming
 network seems a bit low at 2.5MB/s, suggesting to me that this is CPU-bound.



 Does that seem slow? Can anyone offer any ideas by glancing at the query
 as to why this might be slow? We'll profile it meanwhile and post back if
 we find anything ourselves.



 A side issue - I've found that this query, and others, sometimes completes
 but doesn't return any results. There appears to be no error that I can see
 in the logs, and Spark reports the job as successful, but the connected
 JDBC client (SQLWorkbenchJ in this case), just sits there forever waiting.
 I did a quick Google and couldn't find anyone else having similar issues.





 Many thanks,



 James.





Re: Optimisation advice for Avro-Parquet merge job

2015-06-12 Thread James Aley
Hey Kiran,

Thanks very much for the response. I left for vacation before I could try
this out, but I'll experiment once I get back and let you know how it goes.

Thanks!

James.

On 8 June 2015 at 12:34, kiran lonikar loni...@gmail.com wrote:

 It turns out my assumption on load and unionAll being blocking is not
 correct. They are transformations. So instead of just running only the load
 and unionAll in the run() methods, I think you will have to save the
 intermediate dfInput[i] to temp (parquet) files (possibly to in memory DFS
 like http://tachyon-project.org/) in the run() methods. The second for
 loop will also have to load from the intermediate parquet files. Then
 finally save the final dfInput[0] to the HDFS.

 I think this way of parallelizing will force the cluster to utilize the
 all the resources.

 -Kiran

 On Mon, Jun 8, 2015 at 12:30 PM, kiran lonikar loni...@gmail.com wrote:

 James,

 As I can see, there are three distinct parts to your program:

- for loop
- synchronized block
- final outputFrame.save statement

 Can you do a separate timing measurement by putting a simple
 System.currentTimeMillis() around these blocks to know how much they are
 taking and then try to optimize where it takes longest? In the second
 block, you may want to measure the time for the two statements. Improving
 this boils down to playing with spark settings.

 Now consider the first block: I think this is a classic case of merge
 sort or a reduce tree. You already tried to improve this by submitting jobs
 in parallel using executor pool/Callable etc.

 To further improve the parallelization, I suggest you use a reduce tree
 like approach. For example, lets say you want to compute sum of all
 elements of an array in parallel. The way its solved for a GPU like
 platform is you divide your input array initially in chunks of 2, compute
 those n/2 sums parallely on separate threads and save the result in the
 first of the two elements. In the next iteration, you compute n/4 sums
 parallely of the earlier sums and so on till you are left with only two
 elements whose sum gives you final sum.

 You are performing many sequential unionAll operations for inputs.size()
 avro files. Assuming the unionAll() on DataFrame is blocking (and not a
 simple transformation like on RDDs) and actually performs the union
 operation, you will certainly benefit by parallelizing this loop. You may
 change the loop to something like below:

 // pseudo code only
 int n = inputs.size()
 // initialize executor
 executor = new FixedThreadPoolExecutor(n/2)
 dfInput = new DataFrame[n/2]
 for(int i =0;i  n/2;i++) {
 executor.submit(new Runnable() {
 public void run() {
 // union of i and i+n/2
 // showing [] only to bring out array access. Replace with
 dfInput(i) in your code
 dfInput[i] = sqlContext.load(inputPaths.get(i),
 com.databricks.spark.avro).unionAll(sqlContext.load(inputsPath.get(i +
 n/2), com.databricks.spark.avro))
 }
 });
 }

 executor.awaitTermination(0, TimeUnit.SECONDS)

 int steps = log(n)/log(2.0)
 for(s = 2; s  steps;s++) {
 int stride = n/(1  s); // n/(2^s)
 for(int i = 0;i  stride;i++) {
 executor.submit(new Runnable() {
 public void run() {
 // union of i and i+n/2
 // showing [] only to bring out array access. Replace
 with dfInput(i) and dfInput(i+stride) in your code
 dfInput[i] = dfInput[i].unionAll(dfInput[i + stride])
 }
 });
 }
 executor.awaitTermination(0, TimeUnit.SECONDS)
 }

 Let me know if it helped.

 -Kiran


 On Thu, Jun 4, 2015 at 8:57 PM, James Aley james.a...@swiftkey.com
 wrote:

 Thanks for the confirmation! We're quite new to Spark, so a little
 reassurance is a good thing to have sometimes :-)

 The thing that's concerning me at the moment is that my job doesn't seem
 to run any faster with more compute resources added to the cluster, and
 this is proving a little tricky to debug. There are a lot of variables, so
 here's what we've tried already and the apparent impact. If anyone has any
 further suggestions, we'd love to hear!

 * Increase the minimum number of output files (targetPartitions
 above), so that input groups smaller than our minimum chunk size can still
 be worked on by more than one executor. This does measurably speed things
 up, but obviously it's a trade-off, as the original goal for this job is to
 merge our data into fewer, larger files.

 * Submit many jobs in parallel, by running the above code in a Callable,
 on an executor pool. This seems to help, to some extent, but I'm not sure
 what else needs to be configured alongside it -- driver threads, scheduling
 policy, etc. We set scheduling to FAIR when doing this, as that seemed
 like the right approach, but we're not 100% confident. It seemed to help
 quite substantially anyway, so perhaps this just needs further tuning?

 * Increasing executors

Re: Optimisation advice for Avro-Parquet merge job

2015-06-04 Thread James Aley
Thanks for the confirmation! We're quite new to Spark, so a little
reassurance is a good thing to have sometimes :-)

The thing that's concerning me at the moment is that my job doesn't seem to
run any faster with more compute resources added to the cluster, and this
is proving a little tricky to debug. There are a lot of variables, so
here's what we've tried already and the apparent impact. If anyone has any
further suggestions, we'd love to hear!

* Increase the minimum number of output files (targetPartitions above),
so that input groups smaller than our minimum chunk size can still be
worked on by more than one executor. This does measurably speed things up,
but obviously it's a trade-off, as the original goal for this job is to
merge our data into fewer, larger files.

* Submit many jobs in parallel, by running the above code in a Callable, on
an executor pool. This seems to help, to some extent, but I'm not sure what
else needs to be configured alongside it -- driver threads, scheduling
policy, etc. We set scheduling to FAIR when doing this, as that seemed
like the right approach, but we're not 100% confident. It seemed to help
quite substantially anyway, so perhaps this just needs further tuning?

* Increasing executors, RAM, etc. This doesn't make a difference by itself
for this job, so I'm thinking we're already not fully utilising the
resources we have in a smaller cluster.

Again, any recommendations appreciated. Thanks for the help!


James.

On 4 June 2015 at 15:00, Eugen Cepoi cepoi.eu...@gmail.com wrote:

 Hi

 2015-06-04 15:29 GMT+02:00 James Aley james.a...@swiftkey.com:

 Hi,

 We have a load of Avro data coming into our data systems in the form of
 relatively small files, which we're merging into larger Parquet files with
 Spark. I've been following the docs and the approach I'm taking seemed
 fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's
 not the most optimal approach.

 I was wondering if anyone on this list might have some advice to make to
 make this job as efficient as possible. Here's some code:

 DataFrame dfInput = sqlContext.load(inputPaths.get(0),
 com.databricks.spark.avro);
 long totalSize = getDirSize(inputPaths.get(0));

 for (int i = 1; i  inputs.size(); ++i) {
 dfInput = dfInput.unionAll(sqlContext.load(inputPaths.get(i),
 com.databricks.spark.avro));
 totalSize += getDirSize(inputPaths.get(i));
 }

 int targetPartitions = (int) Math.max(2L, totalSize / TARGET_SIZE_BYTES);
 DataFrame outputFrame;

 // Note: HADOOP-10456 impacts us, as we're stuck on 2.4.0 in EMR, hence
 // the synchronize block below. Suggestions welcome here too! :-)
 synchronized (this) {
 RDDRow inputRdd = dfInput.rdd().coalesce(targetPartitions, false,
 null);
 outputFrame = sqlContext.createDataFrame(inputRdd, dfInput.schema());
 }

 outputFrame.save(outputPath, parquet, SaveMode.Overwrite);

 Here are some things bothering me:

- Conversion to an RDD and back again so that we can use coalesce()
to reduce the number of partitions. This is because we read that
repartition() is not as efficient as coalesce(), and local micro 
 benchmarks
seemed to somewhat confirm that this was faster. Is this really a good 
 idea
though? Should we be doing something else?

 Repartition uses coalesce but with a forced shuffle step. Its just a
 shortcut for coalesce(xxx, true)
 Doing a coalesce sounds correct, I'd do the same :) Note that if you add
 the shuffle step, then your partitions should be better balanced.


- Usage of unionAll() - this is the only way I could find to join the
separate data sets into a single data frame to save as Parquet. Is there a
better way?

 When using directly the inputformats you can do this
 FileInputFormat.addInputPath, it should perform at least as good as union.


- Do I need to be using the DataFrame API at all? I'm not querying
any data here, so the nice API for SQL-like transformations of the data
isn't being used. The DataFrame API just seemed like the path of least
resistance for working with Avro and Parquet. Would there be any advantage
to using hadoopRDD() with the appropriate Input/Output formats?



 Using directly the input/outputformats sounds viable. But the snippet you
 show seems clean enough and I am not sure there would be much value in
 making something (maybe) slightly faster but harder to understand.


 Eugen

 Any advice or tips greatly appreciated!


 James.






Optimisation advice for Avro-Parquet merge job

2015-06-04 Thread James Aley
Hi,

We have a load of Avro data coming into our data systems in the form of
relatively small files, which we're merging into larger Parquet files with
Spark. I've been following the docs and the approach I'm taking seemed
fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's
not the most optimal approach.

I was wondering if anyone on this list might have some advice to make to
make this job as efficient as possible. Here's some code:

DataFrame dfInput = sqlContext.load(inputPaths.get(0),
com.databricks.spark.avro);
long totalSize = getDirSize(inputPaths.get(0));

for (int i = 1; i  inputs.size(); ++i) {
dfInput = dfInput.unionAll(sqlContext.load(inputPaths.get(i),
com.databricks.spark.avro));
totalSize += getDirSize(inputPaths.get(i));
}

int targetPartitions = (int) Math.max(2L, totalSize / TARGET_SIZE_BYTES);
DataFrame outputFrame;

// Note: HADOOP-10456 impacts us, as we're stuck on 2.4.0 in EMR, hence
// the synchronize block below. Suggestions welcome here too! :-)
synchronized (this) {
RDDRow inputRdd = dfInput.rdd().coalesce(targetPartitions, false,
null);
outputFrame = sqlContext.createDataFrame(inputRdd, dfInput.schema());
}

outputFrame.save(outputPath, parquet, SaveMode.Overwrite);

Here are some things bothering me:

   - Conversion to an RDD and back again so that we can use coalesce() to
   reduce the number of partitions. This is because we read that repartition()
   is not as efficient as coalesce(), and local micro benchmarks seemed to
   somewhat confirm that this was faster. Is this really a good idea though?
   Should we be doing something else?
   - Usage of unionAll() - this is the only way I could find to join the
   separate data sets into a single data frame to save as Parquet. Is there a
   better way?
   - Do I need to be using the DataFrame API at all? I'm not querying any
   data here, so the nice API for SQL-like transformations of the data isn't
   being used. The DataFrame API just seemed like the path of least resistance
   for working with Avro and Parquet. Would there be any advantage to using
   hadoopRDD() with the appropriate Input/Output formats?


Any advice or tips greatly appreciated!


James.


[Spark SQL] Problems creating a table in specified schema/database

2015-04-28 Thread James Aley
Hey all,

I'm trying to create tables from existing Parquet data in different
schemata. The following isn't working for me:

CREATE DATABASE foo;

CREATE TABLE foo.bar
USING com.databricks.spark.avro
OPTIONS (path '...');

-- Error: org.apache.spark.sql.AnalysisException: cannot recognize input
near 'USING' 'com' '.' in table name; line 1 pos 13 (state=,code=0)

I also tried

USE foo;
CREATE TABLE bar
USING com.databricks.spark.avro
OPTIONS (path '...');

-- Creates the table successfully, but in the default.* schema.


This is on Spark 1.3.1, running on YARN, Hive 0.13.1. Any suggestions?
Should this work?


James.


Advice using Spark SQL and Thrift JDBC Server

2015-04-07 Thread James Aley
Hello,

First of all, thank you to everyone working on Spark. I've only been using
it for a few weeks now but so far I'm really enjoying it. You saved me from
a big, scary elephant! :-)

I was wondering if anyone might be able to offer some advice about working
with the Thrift JDBC server? I'm trying to enable members of my team to
connect and run some basic SQL queries on a Spark cluster using their
favourite JDBC tools. Following the docs [1], I've managed to get something
simple up and running but I'd really appreciate it if someone can validate
my understanding here, as the docs don't go deeply into the details.

Here are a few questions I've not been able to find answers to myself:

1) What exactly is the relationship between the thrift server and Hive? I'm
guessing Spark is just making use of the Hive metastore to access table
definitions, and maybe some other things, is that the case?

2) Am I therefore right in thinking that SQL queries sent to the thrift
server are still executed on the Spark cluster, using Spark SQL, and Hive
plays no active part in computation of results?

3) What SQL flavour is actually supported by the Thrift Server? Is it Spark
SQL, Hive, or both? I've confused, because I've seen it accepting Hive
CREATE TABLE syntax, but Spark SQL seems to work too?

4) When I run SQL queries using the Scala or Python shells, Spark seems to
figure out the schema by itself from my Parquet files very well, if I use
createTempTable on the DataFrame. It seems when running the thrift server,
I need to create a Hive table definition first? Is that the case, or did I
miss something? If it is, is there some sensible way to automate this?


Many thanks!

James

[1]
https://spark.apache.org/docs/latest/sql-programming-guide.html#running-the-thrift-jdbcodbc-server


Re: Advice using Spark SQL and Thrift JDBC Server

2015-04-07 Thread James Aley
Hi Michael,

Thanks so much for the reply - that really cleared a lot of things up for
me!

Let me just check that I've interpreted one of your suggestions for (4)
correctly... Would it make sense for me to write a small wrapper app that
pulls in hive-thriftserver as a dependency, iterates my Parquet directory
structure to discover tables and registers each as a temp table in some
context, before calling HiveThriftServer2.createWithContext as you suggest?

This would mean that to add new content, all I need to is restart that app,
which presumably could also be avoided fairly trivially by periodically
restarting the server with a new context internally. That certainly beats
manual curation of Hive table definitions, if it will work?


Thanks again,

James.

On 7 April 2015 at 19:30, Michael Armbrust mich...@databricks.com wrote:

 1) What exactly is the relationship between the thrift server and Hive?
 I'm guessing Spark is just making use of the Hive metastore to access table
 definitions, and maybe some other things, is that the case?


 Underneath the covers, the Spark SQL thrift server is executing queries
 using a HiveContext.  In this mode, nearly all computation is done with
 Spark SQL but we try to maintain compatibility with Hive wherever
 possible.  This means that you can write your queries in HiveQL, read
 tables from the Hive metastore, and use Hive UDFs UDTs UDAFs, etc.

 The one exception here is Hive DDL operations (CREATE TABLE, etc).  These
 are passed directly to Hive code and executed there.  The Spark SQL DDL is
 sufficiently different that we always try to parse that first, and fall
 back to Hive when it does not parse.

 One possibly confusing point here, is that you can persist Spark SQL
 tables into the Hive metastore, but this is not the same as a Hive table.
 We are only use the metastore as a repo for metadata, but are not using
 their format for the information in this case (as we have datasources that
 hive does not understand, including things like schema auto discovery).

 HiveQL DDL, run by Hive but can be read by Spark SQL: CREATE TABLE t (x
 INT) SORTED AS PARQUET
 Spark SQL DDL, run by Spark SQL, stored in metastore, cannot be read by
 hive: CREATE TABLE t USING parquet (path '/path/to/data')


 2) Am I therefore right in thinking that SQL queries sent to the thrift
 server are still executed on the Spark cluster, using Spark SQL, and Hive
 plays no active part in computation of results?


 Correct.

 3) What SQL flavour is actually supported by the Thrift Server? Is it
 Spark SQL, Hive, or both? I've confused, because I've seen it accepting
 Hive CREATE TABLE syntax, but Spark SQL seems to work too?


 HiveQL++ (with Spark SQL DDL).  You can make it use our simple SQL parser
 by `SET spark.sql.dialect=sql`, but honestly you probably don't want to do
 this.  The included SQL parser is mostly there for people who have
 dependency conflicts with Hive.


 4) When I run SQL queries using the Scala or Python shells, Spark seems
 to figure out the schema by itself from my Parquet files very well, if I
 use createTempTable on the DataFrame. It seems when running the thrift
 server, I need to create a Hive table definition first? Is that the case,
 or did I miss something? If it is, is there some sensible way to automate
 this?


 Temporary tables are only visible to the SQLContext that creates them.  If
 you want it to be visible to the server, you need to either start the
 thrift server with the same context your program is using
 (see HiveThriftServer2.createWithContext) or make a metastore table.  This
 can be done using Spark SQL DDL:

 CREATE TABLE t USING parquet (path '/path/to/data')

 Michael