Hi, Can you help me with my doubts? Any links would also be helpful.
Thanks, Sid On Wed, Feb 23, 2022 at 1:22 AM Sid Kal <flinkbyhe...@gmail.com> wrote: > Hi Mich / Gourav, > > Thanks for your time :) Much appreciated. I went through the article > shared by Mich about the query execution plan. I pretty much understood > most of the things till now except the two things below. > 1) HashAggregate in the plan? Does this always indicate "group by" columns? > 2) Predicate push down under the optimized logical plan. Could you please > help me to understand the predicate pushdown with some other simple example? > > > On Mon, Feb 21, 2022 at 1:52 PM Gourav Sengupta <gourav.sengu...@gmail.com> > wrote: > >> Hi, >> >> I think that the best option is to use the SPARK UI. In SPARK 3.x the UI >> and its additional settings are fantastic. Try to also see the settings for >> Adaptive Query Execution in SPARK, under certain conditions it really works >> wonders. >> >> For certain long queries, the way you are finally triggering the action >> of query execution, and whether you are using SPARK Dataframes or SPARK >> SQL, and the settings in SPARK (look at the settings for SPARK 3.x) and a >> few other aspects you will see that the plan is quite cryptic and difficult >> to read sometimes. >> >> Regards, >> Gourav Sengupta >> >> On Sun, Feb 20, 2022 at 7:32 PM Sid Kal <flinkbyhe...@gmail.com> wrote: >> >>> Hi Gourav, >>> >>> Right now I am just trying to understand the query execution plan by >>> executing a simple join example via Spark SQL. The overall goal is to >>> understand these plans so that going forward if my query runs slow due to >>> data skewness or some other issues, I should be able to atleast understand >>> what exactly is happening at the master and slave sides like map reduce. >>> >>> On Sun, Feb 20, 2022 at 9:06 PM Gourav Sengupta < >>> gourav.sengu...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> what are you trying to achieve by this? >>>> >>>> If there is a performance deterioration, try to collect the query >>>> execution run time statistics from SPARK SQL. They can be seen from the >>>> SPARK SQL UI and available over API's in case I am not wrong. >>>> >>>> Please ensure that you are not trying to over automate things. >>>> >>>> Reading how to understand the plans may be good depending on what you >>>> are trying to do. >>>> >>>> >>>> Regards, >>>> Gourav Sengupta >>>> >>>> On Sat, Feb 19, 2022 at 10:00 AM Sid Kal <flinkbyhe...@gmail.com> >>>> wrote: >>>> >>>>> I wrote a query like below and I am trying to understand its query >>>>> execution plan. >>>>> >>>>> >>> spark.sql("select a.CustomerID,a.CustomerName,b.state from df a >>>>> join df1 b on a.CustomerID=b.CustomerID").explain(mode="extended") >>>>> == Parsed Logical Plan == >>>>> 'Project ['a.CustomerID, 'a.CustomerName, 'b.state] >>>>> +- 'Join Inner, ('a.CustomerID = 'b.CustomerID) >>>>> :- 'SubqueryAlias a >>>>> : +- 'UnresolvedRelation [df], [], false >>>>> +- 'SubqueryAlias b >>>>> +- 'UnresolvedRelation [df1], [], false >>>>> >>>>> == Analyzed Logical Plan == >>>>> CustomerID: int, CustomerName: string, state: string >>>>> Project [CustomerID#640, CustomerName#641, state#988] >>>>> +- Join Inner, (CustomerID#640 = CustomerID#978) >>>>> :- SubqueryAlias a >>>>> : +- SubqueryAlias df >>>>> : +- >>>>> Relation[CustomerID#640,CustomerName#641,Pincode#642,DOB#643,CompletedAddressWithPincode#644,ContactDetailsPhone1#645,ContactDetailsPhone2#646,ContactDetailsMobile#647,ContactDetailsEmail#648,City#649,State#650,ComplaintFlag#651,ProfileCompletenessPercentage#652,WhatsAppOptIn#653,HNINonHNI#654,S2SFlag#655] >>>>> csv >>>>> +- SubqueryAlias b >>>>> +- SubqueryAlias df1 >>>>> +- >>>>> Relation[CustomerID#978,CustomerName#979,Pincode#980,DOB#981,CompletedAddressWithPincode#982,ContactDetailsPhone1#983,ContactDetailsPhone2#984,ContactDetailsMobile#985,ContactDetailsEmail#986,City#987,State#988,ComplaintFlag#989,ProfileCompletenessPercentage#990,WhatsAppOptIn#991,HNINonHNI#992,S2SFlag#993] >>>>> csv >>>>> >>>>> == Optimized Logical Plan == >>>>> Project [CustomerID#640, CustomerName#641, state#988] >>>>> +- Join Inner, (CustomerID#640 = CustomerID#978) >>>>> :- Project [CustomerID#640, CustomerName#641] >>>>> : +- Filter isnotnull(CustomerID#640) >>>>> : +- >>>>> Relation[CustomerID#640,CustomerName#641,Pincode#642,DOB#643,CompletedAddressWithPincode#644,ContactDetailsPhone1#645,ContactDetailsPhone2#646,ContactDetailsMobile#647,ContactDetailsEmail#648,City#649,State#650,ComplaintFlag#651,ProfileCompletenessPercentage#652,WhatsAppOptIn#653,HNINonHNI#654,S2SFlag#655] >>>>> csv >>>>> +- Project [CustomerID#978, State#988] >>>>> +- Filter isnotnull(CustomerID#978) >>>>> +- >>>>> Relation[CustomerID#978,CustomerName#979,Pincode#980,DOB#981,CompletedAddressWithPincode#982,ContactDetailsPhone1#983,ContactDetailsPhone2#984,ContactDetailsMobile#985,ContactDetailsEmail#986,City#987,State#988,ComplaintFlag#989,ProfileCompletenessPercentage#990,WhatsAppOptIn#991,HNINonHNI#992,S2SFlag#993] >>>>> csv >>>>> >>>>> == Physical Plan == >>>>> *(5) Project [CustomerID#640, CustomerName#641, state#988] >>>>> +- *(5) SortMergeJoin [CustomerID#640], [CustomerID#978], Inner >>>>> :- *(2) Sort [CustomerID#640 ASC NULLS FIRST], false, 0 >>>>> : +- Exchange hashpartitioning(CustomerID#640, 200), >>>>> ENSURE_REQUIREMENTS, [id=#451] >>>>> : +- *(1) Filter isnotnull(CustomerID#640) >>>>> : +- FileScan csv [CustomerID#640,CustomerName#641] Batched: >>>>> false, DataFilters: [isnotnull(CustomerID#640)], Format: CSV, Location: >>>>> InMemoryFileIndex[file:/home/siddhesh/Documents/BAXA/csv_output/bulk_with_only_no], >>>>> PartitionFilters: [], PushedFilters: [IsNotNull(CustomerID)], ReadSchema: >>>>> struct<CustomerID:int,CustomerName:string> >>>>> +- *(4) Sort [CustomerID#978 ASC NULLS FIRST], false, 0 >>>>> +- Exchange hashpartitioning(CustomerID#978, 200), >>>>> ENSURE_REQUIREMENTS, [id=#459] >>>>> +- *(3) Filter isnotnull(CustomerID#978) >>>>> +- FileScan csv [CustomerID#978,State#988] Batched: false, >>>>> DataFilters: [isnotnull(CustomerID#978)], Format: CSV, Location: >>>>> InMemoryFileIndex[file:/home/siddhesh/Documents/BAXA/csv_output/bulk_with_only_no], >>>>> PartitionFilters: [], PushedFilters: [IsNotNull(CustomerID)], ReadSchema: >>>>> struct<CustomerID:int,State:string> >>>>> >>>>> I know some of the features like Project is like select clause, >>>>> filters is whatever filters we use in the query. Where can I look for the >>>>> cost optimization in this plan? Suppose in future if my query is taking a >>>>> longer time to be executed then by looking at this plan how can I figure >>>>> what exactly is happening and what needs to be modified on the query part? >>>>> Also internally since spark by default uses sort merge join as I can see >>>>> from the plan but when does it opts for Sort-Merge Join and when does it >>>>> opts for Shuffle-Hash Join? >>>>> >>>>> Thanks, >>>>> Sid >>>>> >>>>>