Range partition for parquet file?

2016-05-27 Thread Rex Xiong
Hi,

I have a spark job output DataFrame which contains a column named Id, which
is a GUID string.
We will use Id to filter data in another spark application, so it should be
a partition key.

I found these two methods in Internet:

1.
DataFrame.write.save("Id") method will help, but the possible value space
for GUID is too big, I prefer to do a range partition to make it 100
partitions evenly.

2.
Another way is DataFrame.repartition("Id"), but the result seems to only
stay in memory, once it's saved, then loaded from another spark
application, we need to repartition it again?

After all, what is the relationship between Parquet partitions and
DataFrame.repartition?
E.g.
The parquet data is stored physically under /year=X/month=Y, I get this
data into DataFrame, then call DataFrame.repartition("Id").
Run this query:
df.filter("year=2016 and month=5 and Id='')
Will Parquet folder pruning still work? Or it's already been partitioned
into Id, so it needs to scan all year/month combinations?

Thanks


Re: Issue of Hive parquet partitioned table schema mismatch

2015-11-06 Thread Rex Xiong
spark.sql.hive.convertMetastoreParquet is true. I can't repro the issue of
scanning all partitions now.. : P

Anyway, I found another email thread "Re: Spark Sql behaves strangely with
tables with a lot of partitions"

I observe the same issue as Jerrick, spark driver will call listStatus
for the whole table folder even if I load a pure hive table (not spark
created), it cost some time for large partitioned table.

Even through spark.sql.parquet.cacheMetadata is true, this listStatus
will run before every query, it's not cached.



2015-11-05 8:50 GMT+08:00 Cheng Lian <lian.cs@gmail.com>:

> Is there any chance that " spark.sql.hive.convertMetastoreParquet" is
> turned off?
>
> Cheng
>
> On 11/4/15 5:15 PM, Rex Xiong wrote:
>
> Thanks Cheng Lian.
> I found in 1.5, if I use spark to create this table with partition
> discovery, the partition pruning can be performed, but for my old table
> definition in pure Hive, the execution plan will do a parquet scan across
> all partitions, and it runs very slow.
> Looks like the execution plan optimization is different.
>
> 2015-11-03 23:10 GMT+08:00 Cheng Lian <lian.cs@gmail.com>:
>
>> SPARK-11153 should be irrelevant because you are filtering on a partition
>> key while SPARK-11153 is about Parquet filter push-down and doesn't affect
>> partition pruning.
>>
>> Cheng
>>
>>
>> On 11/3/15 7:14 PM, Rex Xiong wrote:
>>
>> We found the query performance is very poor due to this issue
>> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-11153
>> We usually use filter on partition key, the date, it's in string type in
>> 1.3.1 and works great.
>> But in 1.5, it needs to do parquet scan for all partitions.
>> 2015年10月31日 下午7:38,"Rex Xiong" <bycha...@gmail.com> 写道:
>>
>>> Add back this thread to email list, forgot to reply all.
>>> 2015年10月31日 下午7:23,"Michael Armbrust" < <mich...@databricks.com>
>>> mich...@databricks.com> 写道:
>>>
>>>> Not that I know of.
>>>>
>>>> On Sat, Oct 31, 2015 at 12:22 PM, Rex Xiong < <bycha...@gmail.com>
>>>> bycha...@gmail.com> wrote:
>>>>
>>>>> Good to know that, will have a try.
>>>>> So there is no easy way to achieve it in pure hive method?
>>>>> 2015年10月 31日 下午7:17,"Michael Armbrust" <mich...@databricks.com> 写道:
>>>>>
>>>>>> Yeah, this was rewritten to be faster in Spark 1.5.  We use it with
>>>>>> 10,000s of partitions.
>>>>>>
>>>>>> On Sat, Oct 31, 2015 at 7:17 AM, Rex Xiong < <bycha...@gmail.com>
>>>>>> bycha...@gmail.com> wrote:
>>>>>>
>>>>>>> 1.3.1
>>>>>>> It is a lot of improvement in 1.5+?
>>>>>>>
>>>>>>> 2015-10-30 19:23 GMT+08:00 Michael Armbrust <
>>>>>>> <mich...@databricks.com>mich...@databricks.com>:
>>>>>>>
>>>>>>>> We have tried schema merging feature, but it's too slow, there're
>>>>>>>>> hundreds of partitions.
>>>>>>>>>
>>>>>>>> Which version of Spark?
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>
>
>


Re: Issue of Hive parquet partitioned table schema mismatch

2015-11-03 Thread Rex Xiong
We found the query performance is very poor due to this issue
https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-11153
We usually use filter on partition key, the date, it's in string type in
1.3.1 and works great.
But in 1.5, it needs to do parquet scan for all partitions.
2015年10月31日 下午7:38,"Rex Xiong" <bycha...@gmail.com>写道:

> Add back this thread to email list, forgot to reply all.
> 2015年10月31日 下午7:23,"Michael Armbrust" <mich...@databricks.com>写道:
>
>> Not that I know of.
>>
>> On Sat, Oct 31, 2015 at 12:22 PM, Rex Xiong <bycha...@gmail.com> wrote:
>>
>>> Good to know that, will have a try.
>>> So there is no easy way to achieve it in pure hive method?
>>> 2015年10月31日 下午7:17,"Michael Armbrust" <mich...@databricks.com>写道:
>>>
>>>> Yeah, this was rewritten to be faster in Spark 1.5.  We use it with
>>>> 10,000s of partitions.
>>>>
>>>> On Sat, Oct 31, 2015 at 7:17 AM, Rex Xiong <bycha...@gmail.com> wrote:
>>>>
>>>>> 1.3.1
>>>>> It is a lot of improvement in 1.5+?
>>>>>
>>>>> 2015-10-30 19:23 GMT+08:00 Michael Armbrust <mich...@databricks.com>:
>>>>>
>>>>>> We have tried schema merging feature, but it's too slow, there're
>>>>>>> hundreds of partitions.
>>>>>>>
>>>>>> Which version of Spark?
>>>>>>
>>>>>
>>>>>
>>>>
>>


Re: Issue of Hive parquet partitioned table schema mismatch

2015-10-31 Thread Rex Xiong
Add back this thread to email list, forgot to reply all.
2015年10月31日 下午7:23,"Michael Armbrust" <mich...@databricks.com>写道:

> Not that I know of.
>
> On Sat, Oct 31, 2015 at 12:22 PM, Rex Xiong <bycha...@gmail.com> wrote:
>
>> Good to know that, will have a try.
>> So there is no easy way to achieve it in pure hive method?
>> 2015年10月31日 下午7:17,"Michael Armbrust" <mich...@databricks.com>写道:
>>
>>> Yeah, this was rewritten to be faster in Spark 1.5.  We use it with
>>> 10,000s of partitions.
>>>
>>> On Sat, Oct 31, 2015 at 7:17 AM, Rex Xiong <bycha...@gmail.com> wrote:
>>>
>>>> 1.3.1
>>>> It is a lot of improvement in 1.5+?
>>>>
>>>> 2015-10-30 19:23 GMT+08:00 Michael Armbrust <mich...@databricks.com>:
>>>>
>>>>> We have tried schema merging feature, but it's too slow, there're
>>>>>> hundreds of partitions.
>>>>>>
>>>>> Which version of Spark?
>>>>>
>>>>
>>>>
>>>
>


Issue of Hive parquet partitioned table schema mismatch

2015-10-30 Thread Rex Xiong
Hi folks,

I have a Hive external table with partitions.
Every day, an App will generate a new partition day=-MM-dd stored by
parquet and run add-partition Hive command.
In some cases, we will add additional column to new partitions and update
Hive table schema, then a query across new and old partitions will fail
with exception:

org.apache.hive.service.cli.HiveSQLException:
org.apache.spark.sql.AnalysisException: cannot resolve 'newcolumn' given
input columns 

We have tried schema merging feature, but it's too slow, there're hundreds
of partitions.
Is it possible to bypass this schema check and return a default value (such
as null) for missing columns?

Thank you


Issue of jar dependency in yarn-cluster mode

2015-10-16 Thread Rex Xiong
Hi folks,

In my spark application, executor task depends on snakeyaml-1.10.jar
I build it with Maven and it works fine:
spark-submit --master local --jars d:\snakeyaml-1.10.jar ...

But when I try to run it in yarn, I have issue, it seems spark executor
cannot find the jar file:
spark-submit --master yarn-cluster --jars hdfs://../snakeyaml-1.10.jar
  ..

java.lang.NoSuchMethodError:
org.yaml.snakeyaml.Yaml.(Lorg/yaml/snakeyaml/constructor/BaseConstructor;)V

I check one executor container folder, snakeyaml-1.10.jar has been
successfully downloaded, and in spark driver page, in environment tab,
spark.yarn.secondary.jars
also contains snakeyaml-1.10.jar

I have no idea why it doesn't work. Could some one help to take a look?

Thanks


Re: Issue of jar dependency in yarn-cluster mode

2015-10-16 Thread Rex Xiong
I resolve this issue finally by adding --conf spark.executor.extraClassPath=
snakeyaml-1.10.jar

2015-10-16 22:57 GMT+08:00 Rex Xiong <bycha...@gmail.com>:

> Hi folks,
>
> In my spark application, executor task depends on snakeyaml-1.10.jar
> I build it with Maven and it works fine:
> spark-submit --master local --jars d:\snakeyaml-1.10.jar ...
>
> But when I try to run it in yarn, I have issue, it seems spark executor
> cannot find the jar file:
> spark-submit --master yarn-cluster --jars hdfs://../snakeyaml-1.10.jar
>   ..
>
> java.lang.NoSuchMethodError:
> org.yaml.snakeyaml.Yaml.(Lorg/yaml/snakeyaml/constructor/BaseConstructor;)V
>
> I check one executor container folder, snakeyaml-1.10.jar has been
> successfully downloaded, and in spark driver page, in environment tab, 
> spark.yarn.secondary.jars
> also contains snakeyaml-1.10.jar
>
> I have no idea why it doesn't work. Could some one help to take a look?
>
> Thanks
>
>


Jar is cached in yarn-cluster mode?

2015-10-09 Thread Rex Xiong
I use "spark-submit -master yarn-cluster hdfs://.../a.jar .." to submit
my app to yarn.
Then I update this a.jar in HDFS, run the command again, I found a line of
log that was been removed still exist in "yarn logs ".
Is there a cache mechanism I need to disable?

Thanks


Is it possible to disable AM page proxy in Yarn client mode?

2015-08-03 Thread Rex Xiong
In Yarn client mode, Spark driver URL will be redirected to Yarn web proxy
server, but I don't want to use this dynamic name, is it possible to still
use host:port as standalone mode?


DESCRIBE FORMATTED doesn't work in Hive Thrift Server?

2015-07-05 Thread Rex Xiong
Hi,

I try to use for one table created in spark, but it seems the results are
all empty, I want to get metadata for table, what's other options?

Thanks

+---+
|result |
+---+
| # col_name|
|   |
| col   |
|   |
| # Detailed Table Information  |
| Database: |
| Owner:|
| CreateTime:   |
| LastAccessTime:   |
| Protect Mode: |
| Retention:|
| Location: |
| Table Type:   |
| Table Parameters: |
|   |
|   |
|   |
|   |
|   |
|   |
|   |
|   |
|   |
|   |
|   |
| # Storage Information |
| SerDe Library:|
| InputFormat:  |
| OutputFormat: |
| Compressed:   |
| Num Buckets:  |
| Bucket Columns:   |
| Sort Columns: |
| Storage Desc Params:  |
|   |
|   |
+---+


How to get Master UI with ZooKeeper HA setup?

2015-05-11 Thread Rex Xiong
Hi,

We have a 3-node master setup with ZooKeeper HA.
Driver can find the master with spark://xxx:xxx,xxx:xxx,xxx:xxx
But how can I find out the valid Master UI without looping through all 3
nodes?

Thanks


Re: Parquet Hive table become very slow on 1.3?

2015-04-22 Thread Rex Xiong
Yin,

Thanks for your reply.
We already patched this PR to our 1.3.0
As Xudong mentioned, we have thousand of parquet files, it's very very slow
in first read, and another app will add more files and refresh table
regularly.
Cheng Lian's PR-5334 seems can resolve this issue, it will skip read all
footer if we set auto merge to false.
But it's not done yet.

Thanks

2015-04-22 23:10 GMT+08:00 Yin Huai yh...@databricks.com:

 Xudong and Rex,

 Can you try 1.3.1? With PR 5339 http://github.com/apache/spark/pull/5339 ,
 after we get a hive parquet from metastore and convert it to our native
 parquet code path, we will cache the converted relation. For now, the first
 access to that hive parquet table reads all of the footers (when you first
 refer to that table in a query or call
 sqlContext.table(hiveParquetTable)). All of your later accesses will hit
 the metadata cache.

 Thanks,

 Yin

 On Tue, Apr 21, 2015 at 1:13 AM, Rex Xiong bycha...@gmail.com wrote:

 We have the similar issue with massive parquet files, Cheng Lian, could
 you have a look?

 2015-04-08 15:47 GMT+08:00 Zheng, Xudong dong...@gmail.com:

 Hi Cheng,

 I tried both these patches, and seems still not resolve my issue. And I
 found the most time is spend on this line in newParquet.scala:

 ParquetFileReader.readAllFootersInParallel(
   sparkContext.hadoopConfiguration, seqAsJavaList(leaves),
 taskSideMetaData)

 Which need read all the files under the Parquet folder, while our
 Parquet folder has a lot of Parquet files (near 2000), read one file need
 about 2 seconds, so it become very slow ... And the PR 5231 did not skip
 this steps so it not resolve my issue.

 As our Parquet files are generated by a Spark job, so the number of
 .parquet files is same with the number of tasks, that is why we have so
 many files. But these files actually have the same schema. Is there any way
 to merge these files into one, or avoid scan each of them?

 On Sat, Apr 4, 2015 at 9:47 PM, Cheng Lian lian.cs@gmail.com
 wrote:

  Hey Xudong,

 We had been digging this issue for a while, and believe PR 5339
 http://github.com/apache/spark/pull/5339 and PR 5334
 http://github.com/apache/spark/pull/5339 should fix this issue.

 There two problems:

 1. Normally we cache Parquet table metadata for better performance, but
 when converting Hive metastore Hive tables, the cache is not used. Thus
 heavy operations like schema discovery is done every time a metastore
 Parquet table is converted.
 2. With Parquet task side metadata reading (which is turned on by
 default), we can actually skip the row group information in the footer.
 However, we accidentally called a Parquet function which doesn't skip row
 group information.

 For your question about schema merging, Parquet allows different
 part-files have different but compatible schemas. For example,
 part-1.parquet has columns a and b, while part-2.parquet may has
 columns a and c. In some cases, the summary files (_metadata and
 _common_metadata) contains the merged schema (a, b, and c), but it's not
 guaranteed. For example, when the user defined metadata stored different
 part-files contain different values for the same key, Parquet simply gives
 up writing summary files. That's why all part-files must be touched to get
 a precise merged schema.

 However, in scenarios where a centralized arbitrative schema is
 available (e.g. Hive metastore schema, or the schema provided by user via
 data source DDL), we don't need to do schema merging on driver side, but
 defer it to executor side and each task only needs to reconcile those
 part-files it needs to touch. This is also what the Parquet developers did
 recently for parquet-hadoop
 https://github.com/apache/incubator-parquet-mr/pull/45.

 Cheng


 On 3/31/15 11:49 PM, Zheng, Xudong wrote:

 Thanks Cheng!

  Set 'spark.sql.parquet.useDataSourceApi' to false resolves my issues,
 but the PR 5231 seems not. Not sure any other things I did wrong ...

  BTW, actually, we are very interested in the schema merging feature
 in Spark 1.3, so both these two solution will disable this feature, right?
 It seems that Parquet metadata is store in a file named _metadata in the
 Parquet file folder (each folder is a partition as we use partition table),
 why we need scan all Parquet part files? Is there any other solutions could
 keep schema merging feature at the same time? We are really like this
 feature :)

 On Tue, Mar 31, 2015 at 3:19 PM, Cheng Lian lian.cs@gmail.com
 wrote:

  Hi Xudong,

 This is probably because of Parquet schema merging is turned on by
 default. This is generally useful for Parquet files with different but
 compatible schemas. But it needs to read metadata from all Parquet
 part-files. This can be problematic when reading Parquet files with lots 
 of
 part-files, especially when the user doesn't need schema merging.

 This issue is tracked by SPARK-6575, and here is a PR for it:
 https://github.com/apache/spark/pull/5231. This PR adds

Re: Parquet Hive table become very slow on 1.3?

2015-04-21 Thread Rex Xiong
We have the similar issue with massive parquet files, Cheng Lian, could you
have a look?

2015-04-08 15:47 GMT+08:00 Zheng, Xudong dong...@gmail.com:

 Hi Cheng,

 I tried both these patches, and seems still not resolve my issue. And I
 found the most time is spend on this line in newParquet.scala:

 ParquetFileReader.readAllFootersInParallel(
   sparkContext.hadoopConfiguration, seqAsJavaList(leaves),
 taskSideMetaData)

 Which need read all the files under the Parquet folder, while our Parquet
 folder has a lot of Parquet files (near 2000), read one file need about 2
 seconds, so it become very slow ... And the PR 5231 did not skip this steps
 so it not resolve my issue.

 As our Parquet files are generated by a Spark job, so the number of
 .parquet files is same with the number of tasks, that is why we have so
 many files. But these files actually have the same schema. Is there any way
 to merge these files into one, or avoid scan each of them?

 On Sat, Apr 4, 2015 at 9:47 PM, Cheng Lian lian.cs@gmail.com wrote:

  Hey Xudong,

 We had been digging this issue for a while, and believe PR 5339
 http://github.com/apache/spark/pull/5339 and PR 5334
 http://github.com/apache/spark/pull/5339 should fix this issue.

 There two problems:

 1. Normally we cache Parquet table metadata for better performance, but
 when converting Hive metastore Hive tables, the cache is not used. Thus
 heavy operations like schema discovery is done every time a metastore
 Parquet table is converted.
 2. With Parquet task side metadata reading (which is turned on by
 default), we can actually skip the row group information in the footer.
 However, we accidentally called a Parquet function which doesn't skip row
 group information.

 For your question about schema merging, Parquet allows different
 part-files have different but compatible schemas. For example,
 part-1.parquet has columns a and b, while part-2.parquet may has
 columns a and c. In some cases, the summary files (_metadata and
 _common_metadata) contains the merged schema (a, b, and c), but it's not
 guaranteed. For example, when the user defined metadata stored different
 part-files contain different values for the same key, Parquet simply gives
 up writing summary files. That's why all part-files must be touched to get
 a precise merged schema.

 However, in scenarios where a centralized arbitrative schema is available
 (e.g. Hive metastore schema, or the schema provided by user via data source
 DDL), we don't need to do schema merging on driver side, but defer it to
 executor side and each task only needs to reconcile those part-files it
 needs to touch. This is also what the Parquet developers did recently for
 parquet-hadoop https://github.com/apache/incubator-parquet-mr/pull/45.

 Cheng


 On 3/31/15 11:49 PM, Zheng, Xudong wrote:

 Thanks Cheng!

  Set 'spark.sql.parquet.useDataSourceApi' to false resolves my issues,
 but the PR 5231 seems not. Not sure any other things I did wrong ...

  BTW, actually, we are very interested in the schema merging feature in
 Spark 1.3, so both these two solution will disable this feature, right? It
 seems that Parquet metadata is store in a file named _metadata in the
 Parquet file folder (each folder is a partition as we use partition table),
 why we need scan all Parquet part files? Is there any other solutions could
 keep schema merging feature at the same time? We are really like this
 feature :)

 On Tue, Mar 31, 2015 at 3:19 PM, Cheng Lian lian.cs@gmail.com
 wrote:

  Hi Xudong,

 This is probably because of Parquet schema merging is turned on by
 default. This is generally useful for Parquet files with different but
 compatible schemas. But it needs to read metadata from all Parquet
 part-files. This can be problematic when reading Parquet files with lots of
 part-files, especially when the user doesn't need schema merging.

 This issue is tracked by SPARK-6575, and here is a PR for it:
 https://github.com/apache/spark/pull/5231. This PR adds a configuration
 to disable schema merging by default when doing Hive metastore Parquet
 table conversion.

 Another workaround is to fallback to the old Parquet code by setting
 spark.sql.parquet.useDataSourceApi to false.

 Cheng


 On 3/31/15 2:47 PM, Zheng, Xudong wrote:

 Hi all,

  We are using Parquet Hive table, and we are upgrading to Spark 1.3.
 But we find that, just a simple COUNT(*) query will much slower (100x) than
 Spark 1.2.

  I find the most time spent on driver to get HDFS blocks. I find large
 amount of get below logs printed:

  15/03/30 23:03:43 DEBUG ProtobufRpcEngine: Call: getBlockLocations took 
 2097ms
 15/03/30 23:03:43 DEBUG DFSClient: newInfo = LocatedBlocks{
   fileLength=77153436
   underConstruction=false
   
 blocks=[LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275;
  getBlockSize()=77153436; corrupt=false; offset=0; 
 locs=[10.152.116.172:50010, 10.152.116.169:50010, 10.153.125.184:50010]}]
   
 

Issue of sqlContext.createExternalTable with parquet partition discovery after changing folder structure

2015-04-04 Thread Rex Xiong
Hi Spark Users,

I'm testing 1.3 new feature of parquet partition discovery.
I have 2 sub folders, each has 800 rows.
/data/table1/key=1
/data/table1/key=2

In spark-shell, run this command:

val t = sqlContext.createExternalTable(table1, hdfs:///data/table1,
parquet)

t.count


It shows 1600 successfully.

But after that, I add a new folder /data/table1/key=3, then run t.count
again, it still gives me 1600, not 2400.


I try to restart spark-shell, then run

val t = sqlContext.table(table1)

t.count


It's 2400 now.


I'm wondering there should be a partition cache in driver, I try to
set spark.sql.parquet.cacheMetadata
to false and test it again, unfortunately it doesn't help.


How can I disable this partition cache or force refresh the cache?


Thanks


Parquet timestamp support for Hive?

2015-04-03 Thread Rex Xiong
Hi,

I got this error when creating a hive table from parquet file:
DDLTask: org.apache.hadoop.hive.ql.metadata.HiveException:
java.lang.UnsupportedOperationException: Parquet does not support
timestamp. See HIVE-6384

I check HIVE-6384, it's fixed in 0.14.
The hive in spark build is a customized version 0.13.1a
(GroupId: org.spark-project.hive), is it possible to get the source code
for it and apply patch from HIVE-6384?

Thanks


Return jobid for a hive query?

2015-03-03 Thread Rex Xiong
Hi there,

I have an app talking to Spark Hive Server using Hive ODBC, querying is OK.
But in this interface, I can't get much running details when my query goes
wrong, only one error message is shown.
I want to get jobid for my query, so that I can go to Application Detail UI
to see what's going on.
Is it a way to achieve my goal? Or I need some customization in error
throwing path?

Thanks