2.0.0 or 2.0.1? There are several correctness fixes in the latter. On Oct 22, 2016 2:14 PM, "Jeremy Davis" <jerda...@speakeasy.net> wrote:
> > Hello, I ran in to a bug with Broadcast Hash Join in Spark 2.0. (Running > on EMR) > If I just toggle spark.sql.autoBroadcastJoinThreshold=-1 then the join > works, if I leave it as default it does not work. > When it doesn’t work, then one of my joined columns is filled with very > small Doubles. > > I’m joining two small tables: (datetime,spx) and (datetime,vix) > Attached are the plans and debug. > > > ================================================================ > > The (Default) Broken case: > > > +-------------+-----------+ > | datetime| spx| > +-------------+-----------+ > |1476907200000|2144.290039| > |1476820800000|2139.600098| > |1476734400000| 2126.5| > |1476475200000| 2132.97998| > |1476388800000|2132.550049| > |1476302400000|2139.179932| > |1476216000000| 2136.72998| > |1476129600000|2163.659912| > |1475870400000| 2153.73999| > |1475784000000| 2160.77002| > |1475697600000| 2159.72998| > |1475611200000| 2150.48999| > |1475524800000|2161.199951| > |1475265600000| 2168.27002| > |1475179200000|2151.129883| > |1475092800000|2171.370117| > |1475006400000|2159.929932| > |1474920000000|2146.100098| > |1474660800000|2164.689941| > |1474574400000|2177.179932| > +-------------+-----------+ > only showing top 20 rows > > +-------------+---------+ > | datetime| vix| > +-------------+---------+ > |1476907200000| 14.41| > |1476820800000| 15.28| > |1476734400000|16.209999| > |1476475200000|16.120001| > |1476388800000|16.690001| > |1476302400000| 15.91| > |1476216000000| 15.36| > |1476129600000| 13.38| > |1475870400000| 13.48| > |1475784000000| 12.84| > |1475697600000| 12.99| > |1475611200000| 13.63| > |1475524800000| 13.57| > |1475265600000| 13.29| > |1475179200000| 14.02| > |1475092800000| 12.39| > |1475006400000| 13.1| > |1474920000000| 14.5| > |1474660800000| 12.29| > |1474574400000| 12.02| > +-------------+---------+ > only showing top 20 rows > > 2016-10-22T20:50:31.345+0000: [GC (Allocation Failure) [PSYoungGen: > 704134K->79382K(945664K)] 823872K->199145K(3089408K), 0.0285894 secs] [Times: > user=0.29 sys=0.04, real=0.03 secs] > == Physical Plan == > *Project [datetime#34L, spx#25, vix#72] > +- *BroadcastHashJoin [datetime#34L], [datetime#81L], Inner, BuildRight > :- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as > decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as > decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as > decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as > decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) > as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS > datetime#34L, cast(Close#4 as double) AS spx#25] > : +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if > (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || > isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || > isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null > else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), > cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), > cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) > : +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths: > s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date), > Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string> > +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, > true])) > +- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as > decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as > decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as > decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as > decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) > as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS > datetime#81L, cast(Close#51 as double) AS vix#72] > +- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && > isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) > as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as > int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as > int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as > int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), > cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) > +- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths: > s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date), > Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string> > ######## > == Parsed Logical Plan == > 'Join UsingJoin(Inner,List('datetime)) > :- Project [datetime#34L, spx#25] > : +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6, > if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || > isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || > isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null > else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), > cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), > cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L] > : +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS > spx#25, Volume#5, Adj Close#6] > : +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16, > Volume#5, Adj Close#6] > : +- Filter NOT (Date#0 = Date) > : +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj > Close#6] csv > +- Project [datetime#81L, vix#72] > +- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj > Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) > as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as > int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as > int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as > int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), > cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L] > +- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as double) > AS vix#72, Volume#52, Adj Close#53] > +- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS vix#63, > Volume#52, Adj Close#53] > +- Filter NOT (Date#47 = Date) > +- > Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv > > == Analyzed Logical Plan == > datetime: bigint, spx: double, vix: double > Project [datetime#34L, spx#25, vix#72] > +- Join Inner, (datetime#34L = datetime#81L) > :- Project [datetime#34L, spx#25] > : +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj > Close#6, if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as > int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) > || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null > else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), > cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), > cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L] > : +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS > spx#25, Volume#5, Adj Close#6] > : +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16, > Volume#5, Adj Close#6] > : +- Filter NOT (Date#0 = Date) > : +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj > Close#6] csv > +- Project [datetime#81L, vix#72] > +- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj > Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) > as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as > int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as > int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as > int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), > cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L] > +- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as > double) AS vix#72, Volume#52, Adj Close#53] > +- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS > vix#63, Volume#52, Adj Close#53] > +- Filter NOT (Date#47 = Date) > +- > Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv > > == Optimized Logical Plan == > Project [datetime#34L, spx#25, vix#72] > +- Join Inner, (datetime#34L = datetime#81L) > :- Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as > decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as > decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as > decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as > decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) > as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS > datetime#34L, cast(Close#4 as double) AS spx#25] > : +- Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if > (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || > isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || > isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null > else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), > cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), > cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) > : +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] > csv > +- Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as > decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as > decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as > decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as > decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) > as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS > datetime#81L, cast(Close#51 as double) AS vix#72] > +- Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if > (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || > isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || > isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null > else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), > cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), > cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) > +- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj > Close#53] csv > > == Physical Plan == > *Project [datetime#34L, spx#25, vix#72] > +- *BroadcastHashJoin [datetime#34L], [datetime#81L], Inner, BuildRight > :- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as > decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as > decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as > decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as > decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) > as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS > datetime#34L, cast(Close#4 as double) AS spx#25] > : +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if > (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || > isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || > isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null > else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), > cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), > cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) > : +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths: > s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date), > Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string> > +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, > true])) > +- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as > decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as > decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as > decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as > decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) > as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS > datetime#81L, cast(Close#51 as double) AS vix#72] > +- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && > isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) > as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as > int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as > int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as > int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), > cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) > +- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths: > s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date), > Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string> > ######## > +-------------+-----------+-------------------+ > | datetime| spx| vix| > +-------------+-----------+-------------------+ > |1476907200000|2144.290039|7.296891096156E-312| > |1476820800000|2139.600098| 7.29646422344E-312| > |1476734400000| 2126.5| 7.29603735072E-312| > |1476475200000| 2132.97998|7.294756732566E-312| > |1476388800000|2132.550049| 7.29432985985E-312| > |1476302400000|2139.179932| 7.29390298713E-312| > |1476216000000| 2136.72998| 7.29347611441E-312| > |1476129600000|2163.659912|7.293049241694E-312| > |1475870400000| 2153.73999| 7.29176862354E-312| > |1475784000000| 2160.77002| 7.29134175082E-312| > |1475697600000| 2159.72998|7.290914878104E-312| > |1475611200000| 2150.48999|7.290488005386E-312| > |1475524800000|2161.199951| 7.29006113267E-312| > |1475265600000| 2168.27002|7.288780514514E-312| > |1475179200000|2151.129883|7.288353641796E-312| > |1475092800000|2171.370117| 7.28792676908E-312| > |1475006400000|2159.929932| 7.28749989636E-312| > |1474920000000|2146.100098| 7.28707302364E-312| > |1474660800000|2164.689941| 7.28579240549E-312| > |1474574400000|2177.179932| 7.28536553277E-312| > +-------------+-----------+-------------------+ > > > > > > ============================================================ > ================= > ============================================================ > ================= > ============================================================ > ================= > > --conf,spark.sql.autoBroadcastJoinThreshold=-1, > > > > > +-------------+-----------+ > | datetime| spx| > +-------------+-----------+ > |1476907200000|2144.290039| > |1476820800000|2139.600098| > |1476734400000| 2126.5| > |1476475200000| 2132.97998| > |1476388800000|2132.550049| > |1476302400000|2139.179932| > |1476216000000| 2136.72998| > |1476129600000|2163.659912| > |1475870400000| 2153.73999| > |1475784000000| 2160.77002| > |1475697600000| 2159.72998| > |1475611200000| 2150.48999| > |1475524800000|2161.199951| > |1475265600000| 2168.27002| > |1475179200000|2151.129883| > |1475092800000|2171.370117| > |1475006400000|2159.929932| > |1474920000000|2146.100098| > |1474660800000|2164.689941| > |1474574400000|2177.179932| > +-------------+-----------+ > only showing top 20 rows > > +-------------+---------+ > | datetime| vix| > +-------------+---------+ > |1476907200000| 14.41| > |1476820800000| 15.28| > |1476734400000|16.209999| > |1476475200000|16.120001| > |1476388800000|16.690001| > |1476302400000| 15.91| > |1476216000000| 15.36| > |1476129600000| 13.38| > |1475870400000| 13.48| > |1475784000000| 12.84| > |1475697600000| 12.99| > |1475611200000| 13.63| > |1475524800000| 13.57| > |1475265600000| 13.29| > |1475179200000| 14.02| > |1475092800000| 12.39| > |1475006400000| 13.1| > |1474920000000| 14.5| > |1474660800000| 12.29| > |1474574400000| 12.02| > +-------------+---------+ > only showing top 20 rows > > == Physical Plan == > *Project [datetime#34L, spx#25, vix#72] > +- *SortMergeJoin [datetime#34L], [datetime#81L], Inner > :- *Sort [datetime#34L ASC], false, 0 > : +- Exchange hashpartitioning(datetime#34L, 200) > : +- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as > decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as > decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as > decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as > decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) > as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS > datetime#34L, cast(Close#4 as double) AS spx#25] > : +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && > isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as > int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) > || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null > else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), > cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), > cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) > : +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths: > s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date), > Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string> > +- *Sort [datetime#81L ASC], false, 0 > +- Exchange hashpartitioning(datetime#81L, 200) > +- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as > decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as > decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as > decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as > decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) > as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS > datetime#81L, cast(Close#51 as double) AS vix#72] > +- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && > isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) > as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as > int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as > int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as > int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), > cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) > +- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths: > s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date), > Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string> > ######## > 2016-10-22T20:58:15.994+0000: [GC (Allocation Failure) [PSYoungGen: > 705910K->79079K(999936K)] 824644K->197829K(2974208K), 0.0294130 secs] [Times: > user=0.26 sys=0.04, real=0.03 secs] > == Parsed Logical Plan == > 'Join UsingJoin(Inner,List('datetime)) > :- Project [datetime#34L, spx#25] > : +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6, > if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || > isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || > isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null > else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), > cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), > cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L] > : +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS > spx#25, Volume#5, Adj Close#6] > : +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16, > Volume#5, Adj Close#6] > : +- Filter NOT (Date#0 = Date) > : +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj > Close#6] csv > +- Project [datetime#81L, vix#72] > +- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj > Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) > as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as > int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as > int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as > int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), > cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L] > +- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as double) > AS vix#72, Volume#52, Adj Close#53] > +- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS vix#63, > Volume#52, Adj Close#53] > +- Filter NOT (Date#47 = Date) > +- > Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv > > == Analyzed Logical Plan == > datetime: bigint, spx: double, vix: double > Project [datetime#34L, spx#25, vix#72] > +- Join Inner, (datetime#34L = datetime#81L) > :- Project [datetime#34L, spx#25] > : +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj > Close#6, if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as > int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) > || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null > else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), > cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), > cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L] > : +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS > spx#25, Volume#5, Adj Close#6] > : +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16, > Volume#5, Adj Close#6] > : +- Filter NOT (Date#0 = Date) > : +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj > Close#6] csv > +- Project [datetime#81L, vix#72] > +- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj > Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) > as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as > int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as > int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as > int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), > cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L] > +- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as > double) AS vix#72, Volume#52, Adj Close#53] > +- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS > vix#63, Volume#52, Adj Close#53] > +- Filter NOT (Date#47 = Date) > +- > Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv > > == Optimized Logical Plan == > Project [datetime#34L, spx#25, vix#72] > +- Join Inner, (datetime#34L = datetime#81L) > :- Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as > decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as > decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as > decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as > decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) > as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS > datetime#34L, cast(Close#4 as double) AS spx#25] > : +- Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if > (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || > isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || > isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null > else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), > cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), > cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) > : +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6] > csv > +- Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as > decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as > decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as > decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as > decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) > as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS > datetime#81L, cast(Close#51 as double) AS vix#72] > +- Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if > (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) || > isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) || > isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null > else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), > cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), > cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) > +- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj > Close#53] csv > > == Physical Plan == > *Project [datetime#34L, spx#25, vix#72] > +- *SortMergeJoin [datetime#34L], [datetime#81L], Inner > :- *Sort [datetime#34L ASC], false, 0 > : +- Exchange hashpartitioning(datetime#34L, 200) > : +- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as > decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as > decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as > decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as > decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) > as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS > datetime#34L, cast(Close#4 as double) AS spx#25] > : +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && > isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as > int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) > || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null > else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), > cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), > cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) > : +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths: > s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date), > Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string> > +- *Sort [datetime#81L ASC], false, 0 > +- Exchange hashpartitioning(datetime#81L, 200) > +- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as > decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as > decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as > decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as > decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) > as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS > datetime#81L, cast(Close#51 as double) AS vix#72] > +- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && > isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) > as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as > int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as > int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as > int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), > cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) > +- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths: > s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date), > Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string> > ######## > +-------------+-----------+---------+ > | datetime| spx| vix| > +-------------+-----------+---------+ > | 931550400000|1403.280029|17.959999| > | 955742400000|1356.560059|33.490002| > | 962308800000|1442.390015|19.700001| > | 967752000000|1517.680054| 16.84| > | 995054400000|1215.680054|21.139999| > |1028145600000| 911.619995|32.029999| > |1049832000000| 878.289978|27.129999| > |1088452800000|1133.349976| 16.07| > |1097265600000|1122.140015| 15.05| > |1102539600000|1182.810059| 13.19| > |1147809600000|1292.079956| 13.35| > |1162414800000|1367.810059| 11.51| > |1266526800000| 1106.75|20.629999| > |1314043200000|1123.819946|42.439999| > |1319227200000| 1238.25| 31.32| > |1331928000000|1404.170044| 14.43| > |1377201600000|1656.959961| 14.76| > |1378756800000|1671.709961| 15.63| > |1390597200000|1790.290039|17.879999| > |1400616000000|1872.829956| 12.96| > +-------------+-----------+---------+ > > > > > > > >