Hello, I ran in to a bug with Broadcast Hash Join in Spark 2.0. (Running on EMR) If I just toggle spark.sql.autoBroadcastJoinThreshold=-1 then the join works, if I leave it as default it does not work. When it doesn’t work, then one of my joined columns is filled with very small Doubles.
I’m joining two small tables: (datetime,spx) and (datetime,vix)
Attached are the plans and debug.
================================================================
The (Default) Broken case:
+-------------+-----------+
| datetime| spx|
+-------------+-----------+
|1476907200000|2144.290039|
|1476820800000|2139.600098|
|1476734400000| 2126.5|
|1476475200000| 2132.97998|
|1476388800000|2132.550049|
|1476302400000|2139.179932|
|1476216000000| 2136.72998|
|1476129600000|2163.659912|
|1475870400000| 2153.73999|
|1475784000000| 2160.77002|
|1475697600000| 2159.72998|
|1475611200000| 2150.48999|
|1475524800000|2161.199951|
|1475265600000| 2168.27002|
|1475179200000|2151.129883|
|1475092800000|2171.370117|
|1475006400000|2159.929932|
|1474920000000|2146.100098|
|1474660800000|2164.689941|
|1474574400000|2177.179932|
+-------------+-----------+
only showing top 20 rows
+-------------+---------+
| datetime| vix|
+-------------+---------+
|1476907200000| 14.41|
|1476820800000| 15.28|
|1476734400000|16.209999|
|1476475200000|16.120001|
|1476388800000|16.690001|
|1476302400000| 15.91|
|1476216000000| 15.36|
|1476129600000| 13.38|
|1475870400000| 13.48|
|1475784000000| 12.84|
|1475697600000| 12.99|
|1475611200000| 13.63|
|1475524800000| 13.57|
|1475265600000| 13.29|
|1475179200000| 14.02|
|1475092800000| 12.39|
|1475006400000| 13.1|
|1474920000000| 14.5|
|1474660800000| 12.29|
|1474574400000| 12.02|
+-------------+---------+
only showing top 20 rows
2016-10-22T20:50:31.345+0000: [GC (Allocation Failure) [PSYoungGen:
704134K->79382K(945664K)] 823872K->199145K(3089408K), 0.0285894 secs] [Times:
user=0.29 sys=0.04, real=0.03 secs]
== Physical Plan ==
*Project [datetime#34L, spx#25, vix#72]
+- *BroadcastHashJoin [datetime#34L], [datetime#81L], Inner, BuildRight
:- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as
decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as
decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as
decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as
decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as
int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS
datetime#34L, cast(Close#4 as double) AS spx#25]
: +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if
(((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) ||
isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) ||
isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else
UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int),
cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int),
cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
: +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths:
s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date),
Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint,
true]))
+- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as
decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as
decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as
decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as
decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as
int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS
datetime#81L, cast(Close#51 as double) AS vix#72]
+- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) &&
isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as
int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int)))
|| isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null
else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int),
cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int),
cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
+- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths:
s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date),
Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
########
== Parsed Logical Plan ==
'Join UsingJoin(Inner,List('datetime))
:- Project [datetime#34L, spx#25]
: +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6, if
(((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) ||
isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) ||
isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else
UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int),
cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int),
cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L]
: +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS
spx#25, Volume#5, Adj Close#6]
: +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16,
Volume#5, Adj Close#6]
: +- Filter NOT (Date#0 = Date)
: +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj
Close#6] csv
+- Project [datetime#81L, vix#72]
+- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj
Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as
int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int)))
|| isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null
else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int),
cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int),
cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L]
+- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as double) AS
vix#72, Volume#52, Adj Close#53]
+- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS vix#63,
Volume#52, Adj Close#53]
+- Filter NOT (Date#47 = Date)
+-
Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv
== Analyzed Logical Plan ==
datetime: bigint, spx: double, vix: double
Project [datetime#34L, spx#25, vix#72]
+- Join Inner, (datetime#34L = datetime#81L)
:- Project [datetime#34L, spx#25]
: +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6,
if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) ||
isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) ||
isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else
UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int),
cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int),
cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L]
: +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS
spx#25, Volume#5, Adj Close#6]
: +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16,
Volume#5, Adj Close#6]
: +- Filter NOT (Date#0 = Date)
: +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj
Close#6] csv
+- Project [datetime#81L, vix#72]
+- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj
Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as
int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int)))
|| isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null
else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int),
cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int),
cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L]
+- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as double)
AS vix#72, Volume#52, Adj Close#53]
+- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS vix#63,
Volume#52, Adj Close#53]
+- Filter NOT (Date#47 = Date)
+-
Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv
== Optimized Logical Plan ==
Project [datetime#34L, spx#25, vix#72]
+- Join Inner, (datetime#34L = datetime#81L)
:- Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0))
as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as
int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int),
cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int),
cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L,
cast(Close#4 as double) AS spx#25]
: +- Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if
(((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) ||
isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) ||
isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else
UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int),
cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int),
cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
: +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6]
csv
+- Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as
decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as
decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as
decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as
decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as
int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS
datetime#81L, cast(Close#51 as double) AS vix#72]
+- Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if
(((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) ||
isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) ||
isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null
else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int),
cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int),
cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
+- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj
Close#53] csv
== Physical Plan ==
*Project [datetime#34L, spx#25, vix#72]
+- *BroadcastHashJoin [datetime#34L], [datetime#81L], Inner, BuildRight
:- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as
decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as
decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as
decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as
decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as
int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS
datetime#34L, cast(Close#4 as double) AS spx#25]
: +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if
(((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) ||
isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) ||
isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else
UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int),
cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int),
cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
: +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths:
s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date),
Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint,
true]))
+- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as
decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as
decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as
decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as
decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as
int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS
datetime#81L, cast(Close#51 as double) AS vix#72]
+- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) &&
isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as
int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int)))
|| isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null
else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int),
cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int),
cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
+- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths:
s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date),
Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
########
+-------------+-----------+-------------------+
| datetime| spx| vix|
+-------------+-----------+-------------------+
|1476907200000|2144.290039|7.296891096156E-312|
|1476820800000|2139.600098| 7.29646422344E-312|
|1476734400000| 2126.5| 7.29603735072E-312|
|1476475200000| 2132.97998|7.294756732566E-312|
|1476388800000|2132.550049| 7.29432985985E-312|
|1476302400000|2139.179932| 7.29390298713E-312|
|1476216000000| 2136.72998| 7.29347611441E-312|
|1476129600000|2163.659912|7.293049241694E-312|
|1475870400000| 2153.73999| 7.29176862354E-312|
|1475784000000| 2160.77002| 7.29134175082E-312|
|1475697600000| 2159.72998|7.290914878104E-312|
|1475611200000| 2150.48999|7.290488005386E-312|
|1475524800000|2161.199951| 7.29006113267E-312|
|1475265600000| 2168.27002|7.288780514514E-312|
|1475179200000|2151.129883|7.288353641796E-312|
|1475092800000|2171.370117| 7.28792676908E-312|
|1475006400000|2159.929932| 7.28749989636E-312|
|1474920000000|2146.100098| 7.28707302364E-312|
|1474660800000|2164.689941| 7.28579240549E-312|
|1474574400000|2177.179932| 7.28536553277E-312|
+-------------+-----------+-------------------+
=============================================================================
=============================================================================
=============================================================================
--conf,spark.sql.autoBroadcastJoinThreshold=-1,
+-------------+-----------+
| datetime| spx|
+-------------+-----------+
|1476907200000|2144.290039|
|1476820800000|2139.600098|
|1476734400000| 2126.5|
|1476475200000| 2132.97998|
|1476388800000|2132.550049|
|1476302400000|2139.179932|
|1476216000000| 2136.72998|
|1476129600000|2163.659912|
|1475870400000| 2153.73999|
|1475784000000| 2160.77002|
|1475697600000| 2159.72998|
|1475611200000| 2150.48999|
|1475524800000|2161.199951|
|1475265600000| 2168.27002|
|1475179200000|2151.129883|
|1475092800000|2171.370117|
|1475006400000|2159.929932|
|1474920000000|2146.100098|
|1474660800000|2164.689941|
|1474574400000|2177.179932|
+-------------+-----------+
only showing top 20 rows
+-------------+---------+
| datetime| vix|
+-------------+---------+
|1476907200000| 14.41|
|1476820800000| 15.28|
|1476734400000|16.209999|
|1476475200000|16.120001|
|1476388800000|16.690001|
|1476302400000| 15.91|
|1476216000000| 15.36|
|1476129600000| 13.38|
|1475870400000| 13.48|
|1475784000000| 12.84|
|1475697600000| 12.99|
|1475611200000| 13.63|
|1475524800000| 13.57|
|1475265600000| 13.29|
|1475179200000| 14.02|
|1475092800000| 12.39|
|1475006400000| 13.1|
|1474920000000| 14.5|
|1474660800000| 12.29|
|1474574400000| 12.02|
+-------------+---------+
only showing top 20 rows
== Physical Plan ==
*Project [datetime#34L, spx#25, vix#72]
+- *SortMergeJoin [datetime#34L], [datetime#81L], Inner
:- *Sort [datetime#34L ASC], false, 0
: +- Exchange hashpartitioning(datetime#34L, 200)
: +- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as
decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as
decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as
decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as
decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as
int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS
datetime#34L, cast(Close#4 as double) AS spx#25]
: +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) &&
isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as
int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int)))
|| isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null
else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int),
cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int),
cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
: +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths:
s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date),
Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
+- *Sort [datetime#81L ASC], false, 0
+- Exchange hashpartitioning(datetime#81L, 200)
+- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as
decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as
decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as
decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as
decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as
int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS
datetime#81L, cast(Close#51 as double) AS vix#72]
+- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) &&
isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as
int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int)))
|| isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null
else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int),
cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int),
cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
+- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths:
s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date),
Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
########
2016-10-22T20:58:15.994+0000: [GC (Allocation Failure) [PSYoungGen:
705910K->79079K(999936K)] 824644K->197829K(2974208K), 0.0294130 secs] [Times:
user=0.26 sys=0.04, real=0.03 secs]
== Parsed Logical Plan ==
'Join UsingJoin(Inner,List('datetime))
:- Project [datetime#34L, spx#25]
: +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6, if
(((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) ||
isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) ||
isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else
UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int),
cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int),
cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L]
: +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS
spx#25, Volume#5, Adj Close#6]
: +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16,
Volume#5, Adj Close#6]
: +- Filter NOT (Date#0 = Date)
: +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj
Close#6] csv
+- Project [datetime#81L, vix#72]
+- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj
Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as
int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int)))
|| isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null
else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int),
cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int),
cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L]
+- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as double) AS
vix#72, Volume#52, Adj Close#53]
+- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS vix#63,
Volume#52, Adj Close#53]
+- Filter NOT (Date#47 = Date)
+-
Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv
== Analyzed Logical Plan ==
datetime: bigint, spx: double, vix: double
Project [datetime#34L, spx#25, vix#72]
+- Join Inner, (datetime#34L = datetime#81L)
:- Project [datetime#34L, spx#25]
: +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6,
if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) ||
isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) ||
isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else
UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int),
cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int),
cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L]
: +- Project [Date#0, Open#1, High#2, Low#3, cast(spx#16 as double) AS
spx#25, Volume#5, Adj Close#6]
: +- Project [Date#0, Open#1, High#2, Low#3, Close#4 AS spx#16,
Volume#5, Adj Close#6]
: +- Filter NOT (Date#0 = Date)
: +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj
Close#6] csv
+- Project [datetime#81L, vix#72]
+- Project [Date#47, Open#48, High#49, Low#50, vix#72, Volume#52, Adj
Close#53, if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as
int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int)))
|| isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null
else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int),
cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int),
cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS datetime#81L]
+- Project [Date#47, Open#48, High#49, Low#50, cast(vix#63 as double)
AS vix#72, Volume#52, Adj Close#53]
+- Project [Date#47, Open#48, High#49, Low#50, Close#51 AS vix#63,
Volume#52, Adj Close#53]
+- Filter NOT (Date#47 = Date)
+-
Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj Close#53] csv
== Optimized Logical Plan ==
Project [datetime#34L, spx#25, vix#72]
+- Join Inner, (datetime#34L = datetime#81L)
:- Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0))
as int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as
int))) || isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
null else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int),
cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int),
cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS datetime#34L,
cast(Close#4 as double) AS spx#25]
: +- Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if
(((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) ||
isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) ||
isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null else
UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int),
cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int),
cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
: +- Relation[Date#0,Open#1,High#2,Low#3,Close#4,Volume#5,Adj Close#6]
csv
+- Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as
decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as
decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as
decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as
decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as
int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS
datetime#81L, cast(Close#51 as double) AS vix#72]
+- Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && isnotnull(if
(((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int)) ||
isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) ||
isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null
else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int),
cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int),
cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
+- Relation[Date#47,Open#48,High#49,Low#50,Close#51,Volume#52,Adj
Close#53] csv
== Physical Plan ==
*Project [datetime#34L, spx#25, vix#72]
+- *SortMergeJoin [datetime#34L], [datetime#81L], Inner
:- *Sort [datetime#34L ASC], false, 0
: +- Exchange hashpartitioning(datetime#34L, 200)
: +- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as
decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as
decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as
decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#0, 1, 4) as
decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as
int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS
datetime#34L, cast(Close#4 as double) AS spx#25]
: +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) &&
isnotnull(if (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as
int)) || isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int)))
|| isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)))) null
else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int),
cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int),
cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int))))
: +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths:
s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date),
Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
+- *Sort [datetime#81L ASC], false, 0
+- Exchange hashpartitioning(datetime#81L, 200)
+- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as
decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as
decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as
decimal(20,0)) as int)))) null else UDF(cast(cast(substring(date#47, 1, 4) as
decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as
int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS
datetime#81L, cast(Close#51 as double) AS vix#72]
+- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) &&
isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as
int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int)))
|| isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)))) null
else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int),
cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int),
cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int))))
+- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths:
s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date),
Not(EqualTo(Date,Date))], ReadSchema: struct<Date:string,Close:string>
########
+-------------+-----------+---------+
| datetime| spx| vix|
+-------------+-----------+---------+
| 931550400000|1403.280029|17.959999|
| 955742400000|1356.560059|33.490002|
| 962308800000|1442.390015|19.700001|
| 967752000000|1517.680054| 16.84|
| 995054400000|1215.680054|21.139999|
|1028145600000| 911.619995|32.029999|
|1049832000000| 878.289978|27.129999|
|1088452800000|1133.349976| 16.07|
|1097265600000|1122.140015| 15.05|
|1102539600000|1182.810059| 13.19|
|1147809600000|1292.079956| 13.35|
|1162414800000|1367.810059| 11.51|
|1266526800000| 1106.75|20.629999|
|1314043200000|1123.819946|42.439999|
|1319227200000| 1238.25| 31.32|
|1331928000000|1404.170044| 14.43|
|1377201600000|1656.959961| 14.76|
|1378756800000|1671.709961| 15.63|
|1390597200000|1790.290039|17.879999|
|1400616000000|1872.829956| 12.96|
+-------------+-----------+---------+
signature.asc
Description: Message signed with OpenPGP using GPGMail
