Re: Sorting in Spark on multiple partitions

2018-06-03 Thread Jörn Franke
You partition by userid, why do you then sort again by userid in the partition? 
Can you try to remove userid from the sort? 

How do you check if the sort is correct or not?

What is the underlying objective of the sort? Do you have more information on 
schema and data?

> On 4. Jun 2018, at 05:47, Sing, Jasbir  wrote:
> 
> Hi Team,
>  
> We are currently using Spark 2.2.0 and facing some challenges in sorting of 
> data on multiple partitions.
> We have tried below approaches:
>  
> Spark SQL approach:
> var query = "select * from data distribute by " + userid + " sort by " + 
> userid + ", " + time “
>  
> This query returns correct results in Hive but not in Spark SQL.  
> var newDf = data.repartition(col(userud)).orderBy(userid, time)
> var newDf = data.repartition(col(userid)).sortWithinPartitions(userid,time)
>  
>  
> But none of the above approach is giving correct results for sorting of data.
> Please suggest what could be done for the same.
>  
> Thanks & Regards,
> Neha Jain
> 
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Where allowed by local law, 
> electronic communications with Accenture and its affiliates, including e-mail 
> and instant messaging (including content), may be scanned by our systems for 
> the purposes of information security and assessment of internal compliance 
> with Accenture policy. Your privacy is important to us. Accenture uses your 
> personal data only in compliance with data protection laws. For further 
> information on how Accenture processes your personal data, please see our 
> privacy statement at https://www.accenture.com/us-en/privacy-policy. 
> __
> 
> www.accenture.com


Sorting in Spark on multiple partitions

2018-06-03 Thread Sing, Jasbir
Hi Team,

We are currently using Spark 2.2.0 and facing some challenges in sorting of 
data on multiple partitions.
We have tried below approaches:


  1.  Spark SQL approach:
 *   var query = "select * from data distribute by " + userid + " sort by " 
+ userid + ", " + time "


This query returns correct results in Hive but not in Spark SQL.

  1.  var newDf = data.repartition(col(userud)).orderBy(userid, time)
  2.  var newDf = 
data.repartition(col(userid)).sortWithinPartitions(userid,time)


But none of the above approach is giving correct results for sorting of data.
Please suggest what could be done for the same.

Thanks & Regards,
Neha Jain



This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security and assessment of internal compliance with Accenture 
policy. Your privacy is important to us. Accenture uses your personal data only 
in compliance with data protection laws. For further information on how 
Accenture processes your personal data, please see our privacy statement at 
https://www.accenture.com/us-en/privacy-policy.
__

www.accenture.com


Re: [Spark SQL] Is it possible to do stream to stream inner join without event time?

2018-06-03 Thread Becket Qin
Bump. Any direction would be helpful. Thanks.

On Fri, Jun 1, 2018 at 6:10 PM, Becket Qin  wrote:

> Hi,
>
> I am new to Spark and I'm trying to run a few queries from TPC-H using
> Spark SQL.
>
> According to the documentation here
> ,
> it is OPTIONAL to have watermark defined in the case of inner join between
> two streams. However, I am keeping getting the following exception:
>
> org.apache.spark.sql.AnalysisException: Append output mode not supported
> when there are streaming aggregations on streaming DataFrames/DataSets
> without watermark
>
> So it looks that the watermark is mandatory. Because there is no timestamp
> in the TPC-H records, I am not able to specify watermark with event time.
> Is there a recommended workaround? e.g. using the process time instead fo
> event time?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>


Spark task default timeout

2018-06-03 Thread Shushant Arora
Hi

I have an spark application where driver starts few tasks and In each task
which is a VoidFunction , I have a long running infinite loop. I have set
speculative execution to false.

Will spark kill my task after sometime (Timeout) or tasks will run
infinitely?
If tasks will be killed after sometime (whats that duration) and how to run
tasks infinite long?


Thansk


Re: Append In-Place to S3

2018-06-03 Thread Tayler Lawrence Jones
Sorry actually my last message is not true for anti join, I was thinking of
semi join.

-TJ

On Sun, Jun 3, 2018 at 14:57 Tayler Lawrence Jones 
wrote:

> A left join with null filter is only the same as a left anti join if the
> join keys can be guaranteed unique in the existing data. Since hive tables
> on s3 offer no unique guarantees outside of your processing code, I
> recommend using left anti join over left join + null filter.
>
> -TJ
>
> On Sun, Jun 3, 2018 at 14:47 ayan guha  wrote:
>
>> I do not use anti join semantics, but you can use left outer join and
>> then filter out nulls from right side. Your data may have dups on the
>> columns separately but it should not have dups on the composite key ie all
>> columns put together.
>>
>> On Mon, 4 Jun 2018 at 6:42 am, Tayler Lawrence Jones <
>> t.jonesd...@gmail.com> wrote:
>>
>>> The issue is not the append vs overwrite - perhaps those responders do
>>> not know Anti join semantics. Further, Overwrite on s3 is a bad pattern due
>>> to s3 eventual consistency issues.
>>>
>>> First, your sql query is wrong as you don’t close the parenthesis of the
>>> CTE (“with” part). In fact, it looks like you don’t need that with at all,
>>> and the query should fail to parse. If that does parse, I would open a bug
>>> on the spark jira.
>>>
>>> Can you provide the query that you are using to detect duplication so I
>>> can see if your deduplication logic matches the detection query?
>>>
>>> -TJ
>>>
>>> On Sat, Jun 2, 2018 at 10:22 Aakash Basu 
>>> wrote:
>>>
 As Jay suggested correctly, if you're joining then overwrite otherwise
 only append as it removes dups.

 I think, in this scenario, just change it to write.mode('overwrite')
 because you're already reading the old data and your job would be done.


 On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim,  wrote:

> Hi Jay,
>
> Thanks for your response. Are you saying to append the new data and
> then remove the duplicates to the whole data set afterwards overwriting 
> the
> existing data set with new data set with appended values? I will give that
> a try.
>
> Cheers,
> Ben
>
> On Fri, Jun 1, 2018 at 11:49 PM Jay 
> wrote:
>
>> Benjamin,
>>
>> The append will append the "new" data to the existing data with
>> removing the duplicates. You would need to overwrite the file everytime 
>> if
>> you need unique values.
>>
>> Thanks,
>> Jayadeep
>>
>> On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim 
>> wrote:
>>
>>> I have a situation where I trying to add only new rows to an
>>> existing data set that lives in S3 as gzipped parquet files, looping and
>>> appending for each hour of the day. First, I create a DF from the 
>>> existing
>>> data, then I use a query to create another DF with the data that is new.
>>> Here is the code snippet.
>>>
>>> df = spark.read.parquet(existing_data_path)
>>> df.createOrReplaceTempView(‘existing_data’)
>>> new_df = spark.read.parquet(new_data_path)
>>> new_df.createOrReplaceTempView(’new_data’)
>>> append_df = spark.sql(
>>> """
>>> WITH ids AS (
>>> SELECT DISTINCT
>>> source,
>>> source_id,
>>> target,
>>> target_id
>>> FROM new_data i
>>> LEFT ANTI JOIN existing_data im
>>> ON i.source = im.source
>>> AND i.source_id = im.source_id
>>> AND i.target = im.target
>>> AND i.target = im.target_id
>>> """
>>> )
>>> append_df.coalesce(1).write.parquet(existing_data_path,
>>> mode='append', compression='gzip’)
>>>
>>>
>>> I thought this would append new rows and keep the data unique, but I
>>> am see many duplicates. Can someone help me with this and tell me what 
>>> I am
>>> doing wrong?
>>>
>>> Thanks,
>>> Ben
>>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>


Re: Append In-Place to S3

2018-06-03 Thread Tayler Lawrence Jones
A left join with null filter is only the same as a left anti join if the
join keys can be guaranteed unique in the existing data. Since hive tables
on s3 offer no unique guarantees outside of your processing code, I
recommend using left anti join over left join + null filter.

-TJ

On Sun, Jun 3, 2018 at 14:47 ayan guha  wrote:

> I do not use anti join semantics, but you can use left outer join and then
> filter out nulls from right side. Your data may have dups on the columns
> separately but it should not have dups on the composite key ie all columns
> put together.
>
> On Mon, 4 Jun 2018 at 6:42 am, Tayler Lawrence Jones <
> t.jonesd...@gmail.com> wrote:
>
>> The issue is not the append vs overwrite - perhaps those responders do
>> not know Anti join semantics. Further, Overwrite on s3 is a bad pattern due
>> to s3 eventual consistency issues.
>>
>> First, your sql query is wrong as you don’t close the parenthesis of the
>> CTE (“with” part). In fact, it looks like you don’t need that with at all,
>> and the query should fail to parse. If that does parse, I would open a bug
>> on the spark jira.
>>
>> Can you provide the query that you are using to detect duplication so I
>> can see if your deduplication logic matches the detection query?
>>
>> -TJ
>>
>> On Sat, Jun 2, 2018 at 10:22 Aakash Basu 
>> wrote:
>>
>>> As Jay suggested correctly, if you're joining then overwrite otherwise
>>> only append as it removes dups.
>>>
>>> I think, in this scenario, just change it to write.mode('overwrite')
>>> because you're already reading the old data and your job would be done.
>>>
>>>
>>> On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim,  wrote:
>>>
 Hi Jay,

 Thanks for your response. Are you saying to append the new data and
 then remove the duplicates to the whole data set afterwards overwriting the
 existing data set with new data set with appended values? I will give that
 a try.

 Cheers,
 Ben

 On Fri, Jun 1, 2018 at 11:49 PM Jay 
 wrote:

> Benjamin,
>
> The append will append the "new" data to the existing data with
> removing the duplicates. You would need to overwrite the file everytime if
> you need unique values.
>
> Thanks,
> Jayadeep
>
> On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim 
> wrote:
>
>> I have a situation where I trying to add only new rows to an existing
>> data set that lives in S3 as gzipped parquet files, looping and appending
>> for each hour of the day. First, I create a DF from the existing data, 
>> then
>> I use a query to create another DF with the data that is new. Here is the
>> code snippet.
>>
>> df = spark.read.parquet(existing_data_path)
>> df.createOrReplaceTempView(‘existing_data’)
>> new_df = spark.read.parquet(new_data_path)
>> new_df.createOrReplaceTempView(’new_data’)
>> append_df = spark.sql(
>> """
>> WITH ids AS (
>> SELECT DISTINCT
>> source,
>> source_id,
>> target,
>> target_id
>> FROM new_data i
>> LEFT ANTI JOIN existing_data im
>> ON i.source = im.source
>> AND i.source_id = im.source_id
>> AND i.target = im.target
>> AND i.target = im.target_id
>> """
>> )
>> append_df.coalesce(1).write.parquet(existing_data_path,
>> mode='append', compression='gzip’)
>>
>>
>> I thought this would append new rows and keep the data unique, but I
>> am see many duplicates. Can someone help me with this and tell me what I 
>> am
>> doing wrong?
>>
>> Thanks,
>> Ben
>>
> --
> Best Regards,
> Ayan Guha
>


Re: Append In-Place to S3

2018-06-03 Thread ayan guha
I do not use anti join semantics, but you can use left outer join and then
filter out nulls from right side. Your data may have dups on the columns
separately but it should not have dups on the composite key ie all columns
put together.

On Mon, 4 Jun 2018 at 6:42 am, Tayler Lawrence Jones 
wrote:

> The issue is not the append vs overwrite - perhaps those responders do not
> know Anti join semantics. Further, Overwrite on s3 is a bad pattern due to
> s3 eventual consistency issues.
>
> First, your sql query is wrong as you don’t close the parenthesis of the
> CTE (“with” part). In fact, it looks like you don’t need that with at all,
> and the query should fail to parse. If that does parse, I would open a bug
> on the spark jira.
>
> Can you provide the query that you are using to detect duplication so I
> can see if your deduplication logic matches the detection query?
>
> -TJ
>
> On Sat, Jun 2, 2018 at 10:22 Aakash Basu 
> wrote:
>
>> As Jay suggested correctly, if you're joining then overwrite otherwise
>> only append as it removes dups.
>>
>> I think, in this scenario, just change it to write.mode('overwrite')
>> because you're already reading the old data and your job would be done.
>>
>>
>> On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim,  wrote:
>>
>>> Hi Jay,
>>>
>>> Thanks for your response. Are you saying to append the new data and then
>>> remove the duplicates to the whole data set afterwards overwriting the
>>> existing data set with new data set with appended values? I will give that
>>> a try.
>>>
>>> Cheers,
>>> Ben
>>>
>>> On Fri, Jun 1, 2018 at 11:49 PM Jay 
>>> wrote:
>>>
 Benjamin,

 The append will append the "new" data to the existing data with
 removing the duplicates. You would need to overwrite the file everytime if
 you need unique values.

 Thanks,
 Jayadeep

 On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim  wrote:

> I have a situation where I trying to add only new rows to an existing
> data set that lives in S3 as gzipped parquet files, looping and appending
> for each hour of the day. First, I create a DF from the existing data, 
> then
> I use a query to create another DF with the data that is new. Here is the
> code snippet.
>
> df = spark.read.parquet(existing_data_path)
> df.createOrReplaceTempView(‘existing_data’)
> new_df = spark.read.parquet(new_data_path)
> new_df.createOrReplaceTempView(’new_data’)
> append_df = spark.sql(
> """
> WITH ids AS (
> SELECT DISTINCT
> source,
> source_id,
> target,
> target_id
> FROM new_data i
> LEFT ANTI JOIN existing_data im
> ON i.source = im.source
> AND i.source_id = im.source_id
> AND i.target = im.target
> AND i.target = im.target_id
> """
> )
> append_df.coalesce(1).write.parquet(existing_data_path, mode='append',
> compression='gzip’)
>
>
> I thought this would append new rows and keep the data unique, but I
> am see many duplicates. Can someone help me with this and tell me what I 
> am
> doing wrong?
>
> Thanks,
> Ben
>
 --
Best Regards,
Ayan Guha


Re: Append In-Place to S3

2018-06-03 Thread Tayler Lawrence Jones
The issue is not the append vs overwrite - perhaps those responders do not
know Anti join semantics. Further, Overwrite on s3 is a bad pattern due to
s3 eventual consistency issues.

First, your sql query is wrong as you don’t close the parenthesis of the
CTE (“with” part). In fact, it looks like you don’t need that with at all,
and the query should fail to parse. If that does parse, I would open a bug
on the spark jira.

Can you provide the query that you are using to detect duplication so I can
see if your deduplication logic matches the detection query?

-TJ

On Sat, Jun 2, 2018 at 10:22 Aakash Basu  wrote:

> As Jay suggested correctly, if you're joining then overwrite otherwise
> only append as it removes dups.
>
> I think, in this scenario, just change it to write.mode('overwrite')
> because you're already reading the old data and your job would be done.
>
>
> On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim,  wrote:
>
>> Hi Jay,
>>
>> Thanks for your response. Are you saying to append the new data and then
>> remove the duplicates to the whole data set afterwards overwriting the
>> existing data set with new data set with appended values? I will give that
>> a try.
>>
>> Cheers,
>> Ben
>>
>> On Fri, Jun 1, 2018 at 11:49 PM Jay  wrote:
>>
>>> Benjamin,
>>>
>>> The append will append the "new" data to the existing data with removing
>>> the duplicates. You would need to overwrite the file everytime if you need
>>> unique values.
>>>
>>> Thanks,
>>> Jayadeep
>>>
>>> On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim  wrote:
>>>
 I have a situation where I trying to add only new rows to an existing
 data set that lives in S3 as gzipped parquet files, looping and appending
 for each hour of the day. First, I create a DF from the existing data, then
 I use a query to create another DF with the data that is new. Here is the
 code snippet.

 df = spark.read.parquet(existing_data_path)
 df.createOrReplaceTempView(‘existing_data’)
 new_df = spark.read.parquet(new_data_path)
 new_df.createOrReplaceTempView(’new_data’)
 append_df = spark.sql(
 """
 WITH ids AS (
 SELECT DISTINCT
 source,
 source_id,
 target,
 target_id
 FROM new_data i
 LEFT ANTI JOIN existing_data im
 ON i.source = im.source
 AND i.source_id = im.source_id
 AND i.target = im.target
 AND i.target = im.target_id
 """
 )
 append_df.coalesce(1).write.parquet(existing_data_path, mode='append',
 compression='gzip’)


 I thought this would append new rows and keep the data unique, but I am
 see many duplicates. Can someone help me with this and tell me what I am
 doing wrong?

 Thanks,
 Ben

>>>


Re: [Spark SQL] error in performing dataset union with complex data type (struct, list)

2018-06-03 Thread Alessandro Solimando
Hi Pranav,
I don´t have an answer to your issue, but what I generally do in this cases
is to first try to simplify it to a point where it is easier to check
what´s going on, and then adding back ¨pieces¨ one by one until I spot the
error.

In your case I can suggest to:

1) project the dataset to the problematic column only (column 21 from your
log)
2) use explode function to have one element of the array per line
3) flatten the struct

At each step use printSchema() to double check if the types are as you
expect them to be, and if they are the same for both datasets.

Best regards,
Alessandro

On 2 June 2018 at 19:48, Pranav Agrawal  wrote:

> can't get around this error when performing union of two datasets
> (ds1.union(ds2)) having complex data type (struct, list),
>
>
> *18/06/02 15:12:00 INFO ApplicationMaster: Final app status: FAILED,
> exitCode: 15, (reason: User class threw exception:
> org.apache.spark.sql.AnalysisException: Union can only be performed on
> tables with the compatible column types.
> array>
> <>
> array>
> at the 21th column of the second table;;*
> As far as I can tell, they are the same. What am I doing wrong? Any help /
> workaround appreciated!
>
> spark version: 2.2.1
>
> Thanks,
> Pranav
>