Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-12 Thread Tathagata Das
You have understood the problem right. However note that your
interpretation of the output *(K, leftValue, null), **(K, leftValue,
rightValue1), **(K, leftValue, rightValue2)* is subject to the knowledge of
the semantics of the join. That if you are processing the output rows
*manually*, you are being aware that the operator is a join where you can
make the semantics interpretation of *"null replaced by first match, then
all matches are just addition rows".* This is however not a general
solution for any sink, and for any operator. We need to find a way to
expose these semantics through the APIs such that a sink can use it without
the knowledge of exactly what operator is in the query writing to the sink.
Therefore we still need some work before we can do join in update mode
clearly.

Hope that makes it clear. :)

On Sat, Mar 10, 2018 at 12:14 AM, kant kodali  wrote:

> I will give an attempt to answer this.
>
> since rightValue1 and rightValue2 have the same key "K"(two matches) why
> would it ever be the case *rightValue2* replacing *rightValue1* replacing 
> *null?
> *Moreover, why does user need to care?
>
> The result in this case (after getting 2 matches) should be
>
> *(K, leftValue, rightValue1)*
> *(K, leftValue, rightValue2)*
>
> This basically means only one of them replaced the earlier null. which one
> of two? Depends on whichever arrived first. Other words, "null's" will be
> replaced by first matching row and subsequently, if there is a new matching
> row it will just be another row with the same key in the result table or if
> there a new unmatched row then the result table should have null's for the
> unmatched fields.
>
> From a user perspective, I believe just spitting out nulls for every
> trigger until there is a match and when there is match spitting out the
> joined rows should suffice isn't it?
>
> Sorry if my thoughts are too naive!
>
>
>
>
>
>
>
>
>
>
> On Thu, Mar 8, 2018 at 6:14 PM, Tathagata Das  > wrote:
>
>> This doc is unrelated to the stream-stream join we added in Structured
>> Streaming. :)
>>
>> That said we added append mode first because it easier to reason about
>> the semantics of append mode especially in the context of outer joins. You
>> output a row only when you know it wont be changed ever. The semantics of
>> update mode in outer joins is trickier to reason about and expose through
>> the APIs. Consider a left outer join. As soon as we get a left-side record
>> with a key K that does not have a match, do we output *(K, leftValue,
>> null)*? And if we do so, then later get 2 matches from the right side,
>> we have to output *(K, leftValue, rightValue1) and (K, leftValue,
>> rightValue2)*. But how do we convey that *rightValue1* and *rightValue2 
>> *together
>> replace the earlier *null*, rather than *rightValue2* replacing
>> *rightValue1* replacing *null?*
>>
>> We will figure these out in future releases. For now, we have released
>> append mode, which allow quite a large range of use cases, including
>> multiple cascading joins.
>>
>> TD
>>
>>
>>
>> On Thu, Mar 8, 2018 at 9:18 AM, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> super interesting.
>>>
>>> On Wed, Mar 7, 2018 at 11:44 AM, kant kodali  wrote:
>>>
 It looks to me that the StateStore described in this doc
 
  Actually
 has full outer join and every other join is a filter of that. Also the doc
 talks about update mode but looks like Spark 2.3 ended up with append mode?
 Anyways the moment it is in master I am ready to test so JIRA tickets on
 this would help to keep track. please let me know.

 Thanks!

 On Tue, Mar 6, 2018 at 9:16 PM, kant kodali  wrote:

> Sorry I meant Spark 2.4 in my previous email
>
> On Tue, Mar 6, 2018 at 9:15 PM, kant kodali 
> wrote:
>
>> Hi TD,
>>
>> I agree I think we are better off either with a full fix or no fix. I
>> am ok with the complete fix being available in master or some branch. I
>> guess the solution for me is to just build from the source.
>>
>> On a similar note, I am not finding any JIRA tickets related to full
>> outer joins and update mode for maybe say Spark 2.3. I wonder how hard is
>> it two implement both of these? It turns out the update mode and full 
>> outer
>> join is very useful and required in my case, therefore, I'm just asking.
>>
>> Thanks!
>>
>> On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> I thought about it.
>>> I am not 100% sure whether this fix should go into 2.3.1.
>>>
>>> There are two parts to this bug fix to enable self-joins.
>>>
>>> 1. Enabling deduping of leaf logical nodes by extending
>>> 

Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-10 Thread kant kodali
I will give an attempt to answer this.

since rightValue1 and rightValue2 have the same key "K"(two matches) why
would it ever be the case *rightValue2* replacing *rightValue1*
replacing *null?
*Moreover, why does user need to care?

The result in this case (after getting 2 matches) should be

*(K, leftValue, rightValue1)*
*(K, leftValue, rightValue2)*

This basically means only one of them replaced the earlier null. which one
of two? Depends on whichever arrived first. Other words, "null's" will be
replaced by first matching row and subsequently, if there is a new matching
row it will just be another row with the same key in the result table or if
there a new unmatched row then the result table should have null's for the
unmatched fields.

>From a user perspective, I believe just spitting out nulls for every
trigger until there is a match and when there is match spitting out the
joined rows should suffice isn't it?

Sorry if my thoughts are too naive!










On Thu, Mar 8, 2018 at 6:14 PM, Tathagata Das 
wrote:

> This doc is unrelated to the stream-stream join we added in Structured
> Streaming. :)
>
> That said we added append mode first because it easier to reason about the
> semantics of append mode especially in the context of outer joins. You
> output a row only when you know it wont be changed ever. The semantics of
> update mode in outer joins is trickier to reason about and expose through
> the APIs. Consider a left outer join. As soon as we get a left-side record
> with a key K that does not have a match, do we output *(K, leftValue,
> null)*? And if we do so, then later get 2 matches from the right side, we
> have to output *(K, leftValue, rightValue1) and (K, leftValue,
> rightValue2)*. But how do we convey that *rightValue1* and *rightValue2 
> *together
> replace the earlier *null*, rather than *rightValue2* replacing
> *rightValue1* replacing *null?*
>
> We will figure these out in future releases. For now, we have released
> append mode, which allow quite a large range of use cases, including
> multiple cascading joins.
>
> TD
>
>
>
> On Thu, Mar 8, 2018 at 9:18 AM, Gourav Sengupta  > wrote:
>
>> super interesting.
>>
>> On Wed, Mar 7, 2018 at 11:44 AM, kant kodali  wrote:
>>
>>> It looks to me that the StateStore described in this doc
>>> 
>>>  Actually
>>> has full outer join and every other join is a filter of that. Also the doc
>>> talks about update mode but looks like Spark 2.3 ended up with append mode?
>>> Anyways the moment it is in master I am ready to test so JIRA tickets on
>>> this would help to keep track. please let me know.
>>>
>>> Thanks!
>>>
>>> On Tue, Mar 6, 2018 at 9:16 PM, kant kodali  wrote:
>>>
 Sorry I meant Spark 2.4 in my previous email

 On Tue, Mar 6, 2018 at 9:15 PM, kant kodali  wrote:

> Hi TD,
>
> I agree I think we are better off either with a full fix or no fix. I
> am ok with the complete fix being available in master or some branch. I
> guess the solution for me is to just build from the source.
>
> On a similar note, I am not finding any JIRA tickets related to full
> outer joins and update mode for maybe say Spark 2.3. I wonder how hard is
> it two implement both of these? It turns out the update mode and full 
> outer
> join is very useful and required in my case, therefore, I'm just asking.
>
> Thanks!
>
> On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> I thought about it.
>> I am not 100% sure whether this fix should go into 2.3.1.
>>
>> There are two parts to this bug fix to enable self-joins.
>>
>> 1. Enabling deduping of leaf logical nodes by extending
>> MultInstanceRelation
>>   - This is safe to be backported into the 2.3 branch as it does not
>> touch production code paths.
>>
>> 2. Fixing attribute rewriting in MicroBatchExecution, when the
>> micro-batch plan is spliced into the streaming plan.
>>   - This touches core production code paths and therefore, may not
>> safe to backport.
>>
>> Part 1 enables self-joins in all but a small fraction of self-join
>> queries. That small fraction can produce incorrect results, and part 2
>> avoids that.
>>
>> So for 2.3.1, we can enable self-joins by merging only part 1, but it
>> can give wrong results in some cases. I think that is strictly worse than
>> no fix.
>>
>> TD
>>
>>
>>
>> On Thu, Feb 22, 2018 at 2:32 PM, kant kodali 
>> wrote:
>>
>>> Hi TD,
>>>
>>> I pulled your commit that is listed on this ticket
>>> https://issues.apache.org/jira/browse/SPARK-23406 specifically I
>>> did the following steps and self 

Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-08 Thread Tathagata Das
This doc is unrelated to the stream-stream join we added in Structured
Streaming. :)

That said we added append mode first because it easier to reason about the
semantics of append mode especially in the context of outer joins. You
output a row only when you know it wont be changed ever. The semantics of
update mode in outer joins is trickier to reason about and expose through
the APIs. Consider a left outer join. As soon as we get a left-side record
with a key K that does not have a match, do we output *(K, leftValue, null)*?
And if we do so, then later get 2 matches from the right side, we have to
output *(K, leftValue, rightValue1) and (K, leftValue, rightValue2)*. But
how do we convey that *rightValue1* and *rightValue2 *together replace the
earlier *null*, rather than *rightValue2* replacing *rightValue1* replacing
*null?*

We will figure these out in future releases. For now, we have released
append mode, which allow quite a large range of use cases, including
multiple cascading joins.

TD



On Thu, Mar 8, 2018 at 9:18 AM, Gourav Sengupta 
wrote:

> super interesting.
>
> On Wed, Mar 7, 2018 at 11:44 AM, kant kodali  wrote:
>
>> It looks to me that the StateStore described in this doc
>> 
>>  Actually
>> has full outer join and every other join is a filter of that. Also the doc
>> talks about update mode but looks like Spark 2.3 ended up with append mode?
>> Anyways the moment it is in master I am ready to test so JIRA tickets on
>> this would help to keep track. please let me know.
>>
>> Thanks!
>>
>> On Tue, Mar 6, 2018 at 9:16 PM, kant kodali  wrote:
>>
>>> Sorry I meant Spark 2.4 in my previous email
>>>
>>> On Tue, Mar 6, 2018 at 9:15 PM, kant kodali  wrote:
>>>
 Hi TD,

 I agree I think we are better off either with a full fix or no fix. I
 am ok with the complete fix being available in master or some branch. I
 guess the solution for me is to just build from the source.

 On a similar note, I am not finding any JIRA tickets related to full
 outer joins and update mode for maybe say Spark 2.3. I wonder how hard is
 it two implement both of these? It turns out the update mode and full outer
 join is very useful and required in my case, therefore, I'm just asking.

 Thanks!

 On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <
 tathagata.das1...@gmail.com> wrote:

> I thought about it.
> I am not 100% sure whether this fix should go into 2.3.1.
>
> There are two parts to this bug fix to enable self-joins.
>
> 1. Enabling deduping of leaf logical nodes by extending
> MultInstanceRelation
>   - This is safe to be backported into the 2.3 branch as it does not
> touch production code paths.
>
> 2. Fixing attribute rewriting in MicroBatchExecution, when the
> micro-batch plan is spliced into the streaming plan.
>   - This touches core production code paths and therefore, may not
> safe to backport.
>
> Part 1 enables self-joins in all but a small fraction of self-join
> queries. That small fraction can produce incorrect results, and part 2
> avoids that.
>
> So for 2.3.1, we can enable self-joins by merging only part 1, but it
> can give wrong results in some cases. I think that is strictly worse than
> no fix.
>
> TD
>
>
>
> On Thu, Feb 22, 2018 at 2:32 PM, kant kodali 
> wrote:
>
>> Hi TD,
>>
>> I pulled your commit that is listed on this ticket
>> https://issues.apache.org/jira/browse/SPARK-23406 specifically I did
>> the following steps and self joins work after I cherry-pick your commit!
>> Good Job! I was hoping it will be part of 2.3.0 but looks like it is
>> targeted for 2.3.1 :(
>>
>> git clone https://github.com/apache/spark.gitcd spark
>> git fetch
>> git checkout branch-2.3
>> git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
>> export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
>> ./build/mvn -DskipTests compile
>> ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr 
>> -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn
>>
>>
>> On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Hey,
>>>
>>> Thanks for testing out stream-stream joins and reporting this issue.
>>> I am going to take a look at this.
>>>
>>> TD
>>>
>>>
>>>
>>> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali 
>>> wrote:
>>>
 if I change it to the below code it works. However, I don't believe
 it is the solution I am looking for. I want to be able to do it in raw
 SQL and moreover, If a user gives a big 

Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-08 Thread Gourav Sengupta
super interesting.

On Wed, Mar 7, 2018 at 11:44 AM, kant kodali  wrote:

> It looks to me that the StateStore described in this doc
> 
>  Actually
> has full outer join and every other join is a filter of that. Also the doc
> talks about update mode but looks like Spark 2.3 ended up with append mode?
> Anyways the moment it is in master I am ready to test so JIRA tickets on
> this would help to keep track. please let me know.
>
> Thanks!
>
> On Tue, Mar 6, 2018 at 9:16 PM, kant kodali  wrote:
>
>> Sorry I meant Spark 2.4 in my previous email
>>
>> On Tue, Mar 6, 2018 at 9:15 PM, kant kodali  wrote:
>>
>>> Hi TD,
>>>
>>> I agree I think we are better off either with a full fix or no fix. I am
>>> ok with the complete fix being available in master or some branch. I guess
>>> the solution for me is to just build from the source.
>>>
>>> On a similar note, I am not finding any JIRA tickets related to full
>>> outer joins and update mode for maybe say Spark 2.3. I wonder how hard is
>>> it two implement both of these? It turns out the update mode and full outer
>>> join is very useful and required in my case, therefore, I'm just asking.
>>>
>>> Thanks!
>>>
>>> On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 I thought about it.
 I am not 100% sure whether this fix should go into 2.3.1.

 There are two parts to this bug fix to enable self-joins.

 1. Enabling deduping of leaf logical nodes by extending
 MultInstanceRelation
   - This is safe to be backported into the 2.3 branch as it does not
 touch production code paths.

 2. Fixing attribute rewriting in MicroBatchExecution, when the
 micro-batch plan is spliced into the streaming plan.
   - This touches core production code paths and therefore, may not safe
 to backport.

 Part 1 enables self-joins in all but a small fraction of self-join
 queries. That small fraction can produce incorrect results, and part 2
 avoids that.

 So for 2.3.1, we can enable self-joins by merging only part 1, but it
 can give wrong results in some cases. I think that is strictly worse than
 no fix.

 TD



 On Thu, Feb 22, 2018 at 2:32 PM, kant kodali 
 wrote:

> Hi TD,
>
> I pulled your commit that is listed on this ticket
> https://issues.apache.org/jira/browse/SPARK-23406 specifically I did
> the following steps and self joins work after I cherry-pick your commit!
> Good Job! I was hoping it will be part of 2.3.0 but looks like it is
> targeted for 2.3.1 :(
>
> git clone https://github.com/apache/spark.gitcd spark
> git fetch
> git checkout branch-2.3
> git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
> export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
> ./build/mvn -DskipTests compile
> ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr 
> -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn
>
>
> On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Hey,
>>
>> Thanks for testing out stream-stream joins and reporting this issue.
>> I am going to take a look at this.
>>
>> TD
>>
>>
>>
>> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali 
>> wrote:
>>
>>> if I change it to the below code it works. However, I don't believe
>>> it is the solution I am looking for. I want to be able to do it in raw
>>> SQL and moreover, If a user gives a big chained raw spark SQL join 
>>> query I
>>> am not even sure how to make copies of the dataframe to achieve the
>>> self-join. Is there any other way here?
>>>
>>>
>>>
>>> import org.apache.spark.sql.streaming.Trigger
>>>
>>> val jdf = 
>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>> "localhost:9092").option("subscribe", 
>>> "join_test").option("startingOffsets", "earliest").load();
>>> val jdf1 = 
>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>> "localhost:9092").option("subscribe", 
>>> "join_test").option("startingOffsets", "earliest").load();
>>>
>>> jdf.createOrReplaceTempView("table")
>>> jdf1.createOrReplaceTempView("table")
>>>
>>> val resultdf = spark.sql("select * from table inner join table1 on 
>>> table.offset=table1.offset")
>>>
>>> resultdf.writeStream.outputMode("append").format("console").option("truncate",
>>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>>
>>>
>>> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali 
>>> wrote:
>>>
 If I change it to this


Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-07 Thread kant kodali
It looks to me that the StateStore described in this doc

Actually
has full outer join and every other join is a filter of that. Also the doc
talks about update mode but looks like Spark 2.3 ended up with append mode?
Anyways the moment it is in master I am ready to test so JIRA tickets on
this would help to keep track. please let me know.

Thanks!

On Tue, Mar 6, 2018 at 9:16 PM, kant kodali  wrote:

> Sorry I meant Spark 2.4 in my previous email
>
> On Tue, Mar 6, 2018 at 9:15 PM, kant kodali  wrote:
>
>> Hi TD,
>>
>> I agree I think we are better off either with a full fix or no fix. I am
>> ok with the complete fix being available in master or some branch. I guess
>> the solution for me is to just build from the source.
>>
>> On a similar note, I am not finding any JIRA tickets related to full
>> outer joins and update mode for maybe say Spark 2.3. I wonder how hard is
>> it two implement both of these? It turns out the update mode and full outer
>> join is very useful and required in my case, therefore, I'm just asking.
>>
>> Thanks!
>>
>> On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> I thought about it.
>>> I am not 100% sure whether this fix should go into 2.3.1.
>>>
>>> There are two parts to this bug fix to enable self-joins.
>>>
>>> 1. Enabling deduping of leaf logical nodes by extending
>>> MultInstanceRelation
>>>   - This is safe to be backported into the 2.3 branch as it does not
>>> touch production code paths.
>>>
>>> 2. Fixing attribute rewriting in MicroBatchExecution, when the
>>> micro-batch plan is spliced into the streaming plan.
>>>   - This touches core production code paths and therefore, may not safe
>>> to backport.
>>>
>>> Part 1 enables self-joins in all but a small fraction of self-join
>>> queries. That small fraction can produce incorrect results, and part 2
>>> avoids that.
>>>
>>> So for 2.3.1, we can enable self-joins by merging only part 1, but it
>>> can give wrong results in some cases. I think that is strictly worse than
>>> no fix.
>>>
>>> TD
>>>
>>>
>>>
>>> On Thu, Feb 22, 2018 at 2:32 PM, kant kodali  wrote:
>>>
 Hi TD,

 I pulled your commit that is listed on this ticket
 https://issues.apache.org/jira/browse/SPARK-23406 specifically I did
 the following steps and self joins work after I cherry-pick your commit!
 Good Job! I was hoping it will be part of 2.3.0 but looks like it is
 targeted for 2.3.1 :(

 git clone https://github.com/apache/spark.gitcd spark
 git fetch
 git checkout branch-2.3
 git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
 export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
 ./build/mvn -DskipTests compile
 ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr 
 -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn


 On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <
 tathagata.das1...@gmail.com> wrote:

> Hey,
>
> Thanks for testing out stream-stream joins and reporting this issue. I
> am going to take a look at this.
>
> TD
>
>
>
> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali 
> wrote:
>
>> if I change it to the below code it works. However, I don't believe
>> it is the solution I am looking for. I want to be able to do it in raw
>> SQL and moreover, If a user gives a big chained raw spark SQL join query 
>> I
>> am not even sure how to make copies of the dataframe to achieve the
>> self-join. Is there any other way here?
>>
>>
>>
>> import org.apache.spark.sql.streaming.Trigger
>>
>> val jdf = 
>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>> "localhost:9092").option("subscribe", 
>> "join_test").option("startingOffsets", "earliest").load();
>> val jdf1 = 
>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>> "localhost:9092").option("subscribe", 
>> "join_test").option("startingOffsets", "earliest").load();
>>
>> jdf.createOrReplaceTempView("table")
>> jdf1.createOrReplaceTempView("table")
>>
>> val resultdf = spark.sql("select * from table inner join table1 on 
>> table.offset=table1.offset")
>>
>> resultdf.writeStream.outputMode("append").format("console").option("truncate",
>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>
>>
>> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali 
>> wrote:
>>
>>> If I change it to this
>>>
>>>
>>>
>>>
>>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali 
>>> wrote:
>>>
 Hi All,

 I have the following code

 import 

Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-06 Thread kant kodali
Sorry I meant Spark 2.4 in my previous email

On Tue, Mar 6, 2018 at 9:15 PM, kant kodali  wrote:

> Hi TD,
>
> I agree I think we are better off either with a full fix or no fix. I am
> ok with the complete fix being available in master or some branch. I guess
> the solution for me is to just build from the source.
>
> On a similar note, I am not finding any JIRA tickets related to full outer
> joins and update mode for maybe say Spark 2.3. I wonder how hard is it two
> implement both of these? It turns out the update mode and full outer join
> is very useful and required in my case, therefore, I'm just asking.
>
> Thanks!
>
> On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das  > wrote:
>
>> I thought about it.
>> I am not 100% sure whether this fix should go into 2.3.1.
>>
>> There are two parts to this bug fix to enable self-joins.
>>
>> 1. Enabling deduping of leaf logical nodes by extending
>> MultInstanceRelation
>>   - This is safe to be backported into the 2.3 branch as it does not
>> touch production code paths.
>>
>> 2. Fixing attribute rewriting in MicroBatchExecution, when the
>> micro-batch plan is spliced into the streaming plan.
>>   - This touches core production code paths and therefore, may not safe
>> to backport.
>>
>> Part 1 enables self-joins in all but a small fraction of self-join
>> queries. That small fraction can produce incorrect results, and part 2
>> avoids that.
>>
>> So for 2.3.1, we can enable self-joins by merging only part 1, but it can
>> give wrong results in some cases. I think that is strictly worse than no
>> fix.
>>
>> TD
>>
>>
>>
>> On Thu, Feb 22, 2018 at 2:32 PM, kant kodali  wrote:
>>
>>> Hi TD,
>>>
>>> I pulled your commit that is listed on this ticket
>>> https://issues.apache.org/jira/browse/SPARK-23406 specifically I did
>>> the following steps and self joins work after I cherry-pick your commit!
>>> Good Job! I was hoping it will be part of 2.3.0 but looks like it is
>>> targeted for 2.3.1 :(
>>>
>>> git clone https://github.com/apache/spark.gitcd spark
>>> git fetch
>>> git checkout branch-2.3
>>> git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
>>> export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
>>> ./build/mvn -DskipTests compile
>>> ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr 
>>> -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn
>>>
>>>
>>> On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 Hey,

 Thanks for testing out stream-stream joins and reporting this issue. I
 am going to take a look at this.

 TD



 On Tue, Feb 20, 2018 at 8:20 PM, kant kodali 
 wrote:

> if I change it to the below code it works. However, I don't believe it
> is the solution I am looking for. I want to be able to do it in raw SQL 
> and
> moreover, If a user gives a big chained raw spark SQL join query I am not
> even sure how to make copies of the dataframe to achieve the self-join. Is
> there any other way here?
>
>
>
> import org.apache.spark.sql.streaming.Trigger
>
> val jdf = 
> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", 
> "join_test").option("startingOffsets", "earliest").load();
> val jdf1 = 
> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", 
> "join_test").option("startingOffsets", "earliest").load();
>
> jdf.createOrReplaceTempView("table")
> jdf1.createOrReplaceTempView("table")
>
> val resultdf = spark.sql("select * from table inner join table1 on 
> table.offset=table1.offset")
>
> resultdf.writeStream.outputMode("append").format("console").option("truncate",
>  false).trigger(Trigger.ProcessingTime(1000)).start()
>
>
> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali 
> wrote:
>
>> If I change it to this
>>
>>
>>
>>
>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali 
>> wrote:
>>
>>> Hi All,
>>>
>>> I have the following code
>>>
>>> import org.apache.spark.sql.streaming.Trigger
>>>
>>> val jdf = 
>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>> "localhost:9092").option("subscribe", 
>>> "join_test").option("startingOffsets", "earliest").load();
>>>
>>> jdf.createOrReplaceTempView("table")
>>>
>>> val resultdf = spark.sql("select * from table as x inner join table as 
>>> y on x.offset=y.offset")
>>>
>>> resultdf.writeStream.outputMode("update").format("console").option("truncate",
>>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>>
>>> and I get the following exception.
>>>
>>> 

Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-06 Thread kant kodali
Hi TD,

I agree I think we are better off either with a full fix or no fix. I am ok
with the complete fix being available in master or some branch. I guess the
solution for me is to just build from the source.

On a similar note, I am not finding any JIRA tickets related to full outer
joins and update mode for maybe say Spark 2.3. I wonder how hard is it two
implement both of these? It turns out the update mode and full outer join
is very useful and required in my case, therefore, I'm just asking.

Thanks!

On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das 
wrote:

> I thought about it.
> I am not 100% sure whether this fix should go into 2.3.1.
>
> There are two parts to this bug fix to enable self-joins.
>
> 1. Enabling deduping of leaf logical nodes by extending
> MultInstanceRelation
>   - This is safe to be backported into the 2.3 branch as it does not touch
> production code paths.
>
> 2. Fixing attribute rewriting in MicroBatchExecution, when the micro-batch
> plan is spliced into the streaming plan.
>   - This touches core production code paths and therefore, may not safe to
> backport.
>
> Part 1 enables self-joins in all but a small fraction of self-join
> queries. That small fraction can produce incorrect results, and part 2
> avoids that.
>
> So for 2.3.1, we can enable self-joins by merging only part 1, but it can
> give wrong results in some cases. I think that is strictly worse than no
> fix.
>
> TD
>
>
>
> On Thu, Feb 22, 2018 at 2:32 PM, kant kodali  wrote:
>
>> Hi TD,
>>
>> I pulled your commit that is listed on this ticket
>> https://issues.apache.org/jira/browse/SPARK-23406 specifically I did the
>> following steps and self joins work after I cherry-pick your commit!
>> Good Job! I was hoping it will be part of 2.3.0 but looks like it is
>> targeted for 2.3.1 :(
>>
>> git clone https://github.com/apache/spark.gitcd spark
>> git fetch
>> git checkout branch-2.3
>> git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
>> export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
>> ./build/mvn -DskipTests compile
>> ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr 
>> -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn
>>
>>
>> On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Hey,
>>>
>>> Thanks for testing out stream-stream joins and reporting this issue. I
>>> am going to take a look at this.
>>>
>>> TD
>>>
>>>
>>>
>>> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali  wrote:
>>>
 if I change it to the below code it works. However, I don't believe it
 is the solution I am looking for. I want to be able to do it in raw SQL and
 moreover, If a user gives a big chained raw spark SQL join query I am not
 even sure how to make copies of the dataframe to achieve the self-join. Is
 there any other way here?



 import org.apache.spark.sql.streaming.Trigger

 val jdf = 
 spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
 "localhost:9092").option("subscribe", 
 "join_test").option("startingOffsets", "earliest").load();
 val jdf1 = 
 spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
 "localhost:9092").option("subscribe", 
 "join_test").option("startingOffsets", "earliest").load();

 jdf.createOrReplaceTempView("table")
 jdf1.createOrReplaceTempView("table")

 val resultdf = spark.sql("select * from table inner join table1 on 
 table.offset=table1.offset")

 resultdf.writeStream.outputMode("append").format("console").option("truncate",
  false).trigger(Trigger.ProcessingTime(1000)).start()


 On Tue, Feb 20, 2018 at 8:16 PM, kant kodali 
 wrote:

> If I change it to this
>
>
>
>
> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali 
> wrote:
>
>> Hi All,
>>
>> I have the following code
>>
>> import org.apache.spark.sql.streaming.Trigger
>>
>> val jdf = 
>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>> "localhost:9092").option("subscribe", 
>> "join_test").option("startingOffsets", "earliest").load();
>>
>> jdf.createOrReplaceTempView("table")
>>
>> val resultdf = spark.sql("select * from table as x inner join table as y 
>> on x.offset=y.offset")
>>
>> resultdf.writeStream.outputMode("update").format("console").option("truncate",
>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>
>> and I get the following exception.
>>
>> org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' 
>> given input columns: [x.value, x.offset, x.key, x.timestampType, 
>> x.topic, x.timestamp, x.partition]; line 1 pos 50;
>> 'Project [*]
>> +- 'Join Inner, ('x.offset = 'y.offset)
>>:- SubqueryAlias x
>> 

Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-06 Thread Tathagata Das
I thought about it.
I am not 100% sure whether this fix should go into 2.3.1.

There are two parts to this bug fix to enable self-joins.

1. Enabling deduping of leaf logical nodes by extending
MultInstanceRelation
  - This is safe to be backported into the 2.3 branch as it does not touch
production code paths.

2. Fixing attribute rewriting in MicroBatchExecution, when the micro-batch
plan is spliced into the streaming plan.
  - This touches core production code paths and therefore, may not safe to
backport.

Part 1 enables self-joins in all but a small fraction of self-join queries.
That small fraction can produce incorrect results, and part 2 avoids that.

So for 2.3.1, we can enable self-joins by merging only part 1, but it can
give wrong results in some cases. I think that is strictly worse than no
fix.

TD



On Thu, Feb 22, 2018 at 2:32 PM, kant kodali  wrote:

> Hi TD,
>
> I pulled your commit that is listed on this ticket https://issues.apache.
> org/jira/browse/SPARK-23406 specifically I did the following steps and
> self joins work after I cherry-pick your commit! Good Job! I was hoping it
> will be part of 2.3.0 but looks like it is targeted for 2.3.1 :(
>
> git clone https://github.com/apache/spark.gitcd spark
> git fetch
> git checkout branch-2.3
> git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
> export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
> ./build/mvn -DskipTests compile
> ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr 
> -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn
>
>
> On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Hey,
>>
>> Thanks for testing out stream-stream joins and reporting this issue. I am
>> going to take a look at this.
>>
>> TD
>>
>>
>>
>> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali  wrote:
>>
>>> if I change it to the below code it works. However, I don't believe it
>>> is the solution I am looking for. I want to be able to do it in raw SQL and
>>> moreover, If a user gives a big chained raw spark SQL join query I am not
>>> even sure how to make copies of the dataframe to achieve the self-join. Is
>>> there any other way here?
>>>
>>>
>>>
>>> import org.apache.spark.sql.streaming.Trigger
>>>
>>> val jdf = 
>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>> "localhost:9092").option("subscribe", 
>>> "join_test").option("startingOffsets", "earliest").load();
>>> val jdf1 = 
>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>> "localhost:9092").option("subscribe", 
>>> "join_test").option("startingOffsets", "earliest").load();
>>>
>>> jdf.createOrReplaceTempView("table")
>>> jdf1.createOrReplaceTempView("table")
>>>
>>> val resultdf = spark.sql("select * from table inner join table1 on 
>>> table.offset=table1.offset")
>>>
>>> resultdf.writeStream.outputMode("append").format("console").option("truncate",
>>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>>
>>>
>>> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali  wrote:
>>>
 If I change it to this




 On Tue, Feb 20, 2018 at 7:52 PM, kant kodali 
 wrote:

> Hi All,
>
> I have the following code
>
> import org.apache.spark.sql.streaming.Trigger
>
> val jdf = 
> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", 
> "join_test").option("startingOffsets", "earliest").load();
>
> jdf.createOrReplaceTempView("table")
>
> val resultdf = spark.sql("select * from table as x inner join table as y 
> on x.offset=y.offset")
>
> resultdf.writeStream.outputMode("update").format("console").option("truncate",
>  false).trigger(Trigger.ProcessingTime(1000)).start()
>
> and I get the following exception.
>
> org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given 
> input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, 
> x.timestamp, x.partition]; line 1 pos 50;
> 'Project [*]
> +- 'Join Inner, ('x.offset = 'y.offset)
>:- SubqueryAlias x
>:  +- SubqueryAlias table
>: +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
> offset#32L, timestamp#33, timestampType#34]
>+- SubqueryAlias y
>   +- SubqueryAlias table
>  +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
> 

Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-02-22 Thread kant kodali
Hi TD,

I pulled your commit that is listed on this ticket
https://issues.apache.org/jira/browse/SPARK-23406 specifically I did the
following steps and self joins work after I cherry-pick your commit!
Good Job! I was hoping it will be part of 2.3.0 but looks like it is
targeted for 2.3.1 :(

git clone https://github.com/apache/spark.gitcd spark
git fetch
git checkout branch-2.3
git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
./build/mvn -DskipTests compile
./dev/make-distribution.sh --name custom-spark --pip --r --tgz
-Psparkr -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn


On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das  wrote:

> Hey,
>
> Thanks for testing out stream-stream joins and reporting this issue. I am
> going to take a look at this.
>
> TD
>
>
>
> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali  wrote:
>
>> if I change it to the below code it works. However, I don't believe it is
>> the solution I am looking for. I want to be able to do it in raw SQL and
>> moreover, If a user gives a big chained raw spark SQL join query I am not
>> even sure how to make copies of the dataframe to achieve the self-join. Is
>> there any other way here?
>>
>>
>>
>> import org.apache.spark.sql.streaming.Trigger
>>
>> val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>> "localhost:9092").option("subscribe", "join_test").option("startingOffsets", 
>> "earliest").load();
>> val jdf1 = 
>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>> "localhost:9092").option("subscribe", "join_test").option("startingOffsets", 
>> "earliest").load();
>>
>> jdf.createOrReplaceTempView("table")
>> jdf1.createOrReplaceTempView("table")
>>
>> val resultdf = spark.sql("select * from table inner join table1 on 
>> table.offset=table1.offset")
>>
>> resultdf.writeStream.outputMode("append").format("console").option("truncate",
>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>
>>
>> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali  wrote:
>>
>>> If I change it to this
>>>
>>>
>>>
>>>
>>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali  wrote:
>>>
 Hi All,

 I have the following code

 import org.apache.spark.sql.streaming.Trigger

 val jdf = 
 spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
 "localhost:9092").option("subscribe", 
 "join_test").option("startingOffsets", "earliest").load();

 jdf.createOrReplaceTempView("table")

 val resultdf = spark.sql("select * from table as x inner join table as y 
 on x.offset=y.offset")

 resultdf.writeStream.outputMode("update").format("console").option("truncate",
  false).trigger(Trigger.ProcessingTime(1000)).start()

 and I get the following exception.

 org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given 
 input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, 
 x.timestamp, x.partition]; line 1 pos 50;
 'Project [*]
 +- 'Join Inner, ('x.offset = 'y.offset)
:- SubqueryAlias x
:  +- SubqueryAlias table
: +- StreamingRelation 
 DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
 localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
 offset#32L, timestamp#33, timestampType#34]
+- SubqueryAlias y
   +- SubqueryAlias table
  +- StreamingRelation 
 DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
 localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
 offset#32L, timestamp#33, timestampType#34]

 any idea whats wrong here?

 Thanks!







>>>
>>
>


Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-02-22 Thread Tathagata Das
Hey,

Thanks for testing out stream-stream joins and reporting this issue. I am
going to take a look at this.

TD



On Tue, Feb 20, 2018 at 8:20 PM, kant kodali  wrote:

> if I change it to the below code it works. However, I don't believe it is
> the solution I am looking for. I want to be able to do it in raw SQL and
> moreover, If a user gives a big chained raw spark SQL join query I am not
> even sure how to make copies of the dataframe to achieve the self-join. Is
> there any other way here?
>
>
>
> import org.apache.spark.sql.streaming.Trigger
>
> val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", "join_test").option("startingOffsets", 
> "earliest").load();
> val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", "join_test").option("startingOffsets", 
> "earliest").load();
>
> jdf.createOrReplaceTempView("table")
> jdf1.createOrReplaceTempView("table")
>
> val resultdf = spark.sql("select * from table inner join table1 on 
> table.offset=table1.offset")
>
> resultdf.writeStream.outputMode("append").format("console").option("truncate",
>  false).trigger(Trigger.ProcessingTime(1000)).start()
>
>
> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali  wrote:
>
>> If I change it to this
>>
>>
>>
>>
>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> I have the following code
>>>
>>> import org.apache.spark.sql.streaming.Trigger
>>>
>>> val jdf = 
>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>> "localhost:9092").option("subscribe", 
>>> "join_test").option("startingOffsets", "earliest").load();
>>>
>>> jdf.createOrReplaceTempView("table")
>>>
>>> val resultdf = spark.sql("select * from table as x inner join table as y on 
>>> x.offset=y.offset")
>>>
>>> resultdf.writeStream.outputMode("update").format("console").option("truncate",
>>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>>
>>> and I get the following exception.
>>>
>>> org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given 
>>> input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, 
>>> x.timestamp, x.partition]; line 1 pos 50;
>>> 'Project [*]
>>> +- 'Join Inner, ('x.offset = 'y.offset)
>>>:- SubqueryAlias x
>>>:  +- SubqueryAlias table
>>>: +- StreamingRelation 
>>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>>>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
>>> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
>>> offset#32L, timestamp#33, timestampType#34]
>>>+- SubqueryAlias y
>>>   +- SubqueryAlias table
>>>  +- StreamingRelation 
>>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>>>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
>>> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
>>> offset#32L, timestamp#33, timestampType#34]
>>>
>>> any idea whats wrong here?
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>


Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-02-20 Thread kant kodali
if I change it to the below code it works. However, I don't believe it is
the solution I am looking for. I want to be able to do it in raw SQL and
moreover, If a user gives a big chained raw spark SQL join query I am not
even sure how to make copies of the dataframe to achieve the self-join. Is
there any other way here?



import org.apache.spark.sql.streaming.Trigger

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe",
"join_test").option("startingOffsets", "earliest").load();
val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe",
"join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")
jdf1.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table inner join table1 on
table.offset=table1.offset")

resultdf.writeStream.outputMode("append").format("console").option("truncate",
false).trigger(Trigger.ProcessingTime(1000)).start()


On Tue, Feb 20, 2018 at 8:16 PM, kant kodali  wrote:

> If I change it to this
>
>
>
>
> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali  wrote:
>
>> Hi All,
>>
>> I have the following code
>>
>> import org.apache.spark.sql.streaming.Trigger
>>
>> val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>> "localhost:9092").option("subscribe", "join_test").option("startingOffsets", 
>> "earliest").load();
>>
>> jdf.createOrReplaceTempView("table")
>>
>> val resultdf = spark.sql("select * from table as x inner join table as y on 
>> x.offset=y.offset")
>>
>> resultdf.writeStream.outputMode("update").format("console").option("truncate",
>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>
>> and I get the following exception.
>>
>> org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given 
>> input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, 
>> x.timestamp, x.partition]; line 1 pos 50;
>> 'Project [*]
>> +- 'Join Inner, ('x.offset = 'y.offset)
>>:- SubqueryAlias x
>>:  +- SubqueryAlias table
>>: +- StreamingRelation 
>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
>> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
>> offset#32L, timestamp#33, timestampType#34]
>>+- SubqueryAlias y
>>   +- SubqueryAlias table
>>  +- StreamingRelation 
>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
>> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
>> offset#32L, timestamp#33, timestampType#34]
>>
>> any idea whats wrong here?
>>
>> Thanks!
>>
>>
>>
>>
>>
>>
>>
>


Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-02-20 Thread kant kodali
If I change it to this




On Tue, Feb 20, 2018 at 7:52 PM, kant kodali  wrote:

> Hi All,
>
> I have the following code
>
> import org.apache.spark.sql.streaming.Trigger
>
> val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", "join_test").option("startingOffsets", 
> "earliest").load();
>
> jdf.createOrReplaceTempView("table")
>
> val resultdf = spark.sql("select * from table as x inner join table as y on 
> x.offset=y.offset")
>
> resultdf.writeStream.outputMode("update").format("console").option("truncate",
>  false).trigger(Trigger.ProcessingTime(1000)).start()
>
> and I get the following exception.
>
> org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given 
> input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, 
> x.timestamp, x.partition]; line 1 pos 50;
> 'Project [*]
> +- 'Join Inner, ('x.offset = 'y.offset)
>:- SubqueryAlias x
>:  +- SubqueryAlias table
>: +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
> offset#32L, timestamp#33, timestampType#34]
>+- SubqueryAlias y
>   +- SubqueryAlias table
>  +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
> offset#32L, timestamp#33, timestampType#34]
>
> any idea whats wrong here?
>
> Thanks!
>
>
>
>
>
>
>


what is the right syntax for self joins in Spark 2.3.0 ?

2018-02-20 Thread kant kodali
Hi All,

I have the following code

import org.apache.spark.sql.streaming.Trigger

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe",
"join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table as x inner join table as
y on x.offset=y.offset")

resultdf.writeStream.outputMode("update").format("console").option("truncate",
false).trigger(Trigger.ProcessingTime(1000)).start()

and I get the following exception.

org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`'
given input columns: [x.value, x.offset, x.key, x.timestampType,
x.topic, x.timestamp, x.partition]; line 1 pos 50;
'Project [*]
+- 'Join Inner, ('x.offset = 'y.offset)
   :- SubqueryAlias x
   :  +- SubqueryAlias table
   : +- StreamingRelation
DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
-> earliest, subscribe -> join_test, kafka.bootstrap.servers ->
localhost:9092),None), kafka, [key#28, value#29, topic#30,
partition#31, offset#32L, timestamp#33, timestampType#34]
   +- SubqueryAlias y
  +- SubqueryAlias table
 +- StreamingRelation
DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
-> earliest, subscribe -> join_test, kafka.bootstrap.servers ->
localhost:9092),None), kafka, [key#28, value#29, topic#30,
partition#31, offset#32L, timestamp#33, timestampType#34]

any idea whats wrong here?

Thanks!