hi all
i have a query
```
1 spark.sql("select
distinct cust_id,
cast (b.device_name as varchar(200)) as devc_name_cast,
prmry_reside_cntry_code
from (select * from ${model_db}.crs_recent_30d_SF_dim_cust_info where
dt='${today}') a
join fact_rsk_magnes_txn b on a.cust_id = b.customer_id
where b.device_name <> '#'
and b.device_name is not null
and b.device_name <>'~'").registerTempTable("device_driver_info_0")
2 spark.sql("select
*,
lower(regexp_REPLACE (devc_name_cast, 'â', 'a') ) as devc_name_norm
from device_driver_info_0").registerTempTable("device_driver_info_1")
3 spark.sql("select
cust_id,
devc_name_norm ||'_'|| prmry_reside_cntry_code as Device_Name_Country
from device_driver_info_1 where
dt='${today}'").registerTempTable("device_driver_info")
4 spark.sql("select
cust_id,
Device_Name_Country
from device_driver_info
where Device_Name_Country is not null
group by 1,2").registerTempTable("device_name_SF_final_acct_info")
5 spark.sql("select
Device_Name_Country,
count(distinct cust_id) as cust_cnt
from device_name_SF_final_acct_info
group by 1").registerTempTable("device_count_1")
spark.sql("select * from device_count_1 where cust_cnt between 5 and
5000").registerTempTable("device_count")
6 spark.sql("select
b.cust_id,
cast('Device_Name_Country' as varchar(100)) network_source,
cast(a.Device_Name_Country as varchar(100)) as network_source_value
from device_count a
left join device_name_SF_final_acct_info b
on a.Device_Name_Country=b.Device_Name_Country").write
.mode(SaveMode.Overwrite)
.insertInto(s"$databaseName.$tableName")
```
the problem here is from the logical plan , we can see Device_Name_Country is
composed
by 'devc_name_norm ||'_'|| prmry_reside_cntry_code' in sql#3 but it does not
show in below logic plan so it throws error. I find the sql run successfully
on spark2 while on
spark3.1.2 it has error, please help
ShuffleQueryStage 6
+- Exchange hashpartitioning(cust_id#4030, Device_Name_Country#4099, 3001),
ENSURE_REQUIREMENTS, [id=#2669]
+- *(5) HashAggregate(keys=[cust_id#4030, Device_Name_Country#4099],
functions=[], output=[cust_id#4030, Device_Name_Country#4099])
+- CustomShuffleReader coalesced
+- ShuffleQueryStage 3
+- Exchange hashpartitioning(cust_id#4030, devc_name_cast#4029,
prmry_reside_cntry_code#4036, 3001), ENSURE_REQUIREMENTS, [id=#2376]
+- *(3) HashAggregate(keys=[cust_id#4030, devc_name_cast#4029,
prmry_reside_cntry_code#4036], functions=[], output=[cust_id#4030,
devc_name_cast#4029, prmry_reside_cntry_code#4036])
+- *(3) Project [cust_id#4030, device_name#3453 AS devc_name_cast#4029,
prmry_reside_cntry_code#4036]
+- *(3) BroadcastHashJoin [cust_id#4030], [customer_id#3431], Inner, BuildLeft,
isnotnull(concat(concat(lower(regexp_replace(device_name#3453, â, a, 1)), _),
prmry_reside_cntry_code#4036)), false
:- BroadcastQueryStage 0
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
false]),false), [id=#2132]
: +- *(1) Filter isnotnull(cust_id#4030)
: +- Scan hive unified_group_review_cri_group.crs_recent_30d_sf_dim_cust_info
[cust_id#4030, prmry_reside_cntry_code#4036], HiveTableRelation
[`unified_group_review_cri_group`.`crs_recent_30d_sf_dim_cust_info`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [cust_id#4030,
acct_cre_dt#4031, is_guest_y_n#4032, prmry_email_domain#4033,
cust_first_name#4034..., Partition Cols: [dt#4048], Pruned Partitions:
[(dt=2023-03-23)]], [isnotnull(dt#4048), (dt#4048 = 2023-03-23)]
+- *(3) Project [customer_id#3431, device_name#3453]
+- *(3) Filter (((NOT (device_name#3453 = #) AND isnotnull(device_name#3453))
AND NOT (device_name#3453 = ~)) AND isnotnull(customer_id#3431))
+- *(3) ColumnarToRow
+- FileScan parquet
pp_risk_ops_qh_tables.magnes_fraudnet_login_raw[customer_id#3431,device_name#3453,ts#3603,event_dt#3604]
Batched: true, DataFilters: [NOT (device_name#3453 = #),
isnotnull(device_name#3453), NOT (device_name#3453 = ~), isnotnull(c...,
Format: Parquet, Location:
InMemoryFileIndex[gs://pypl-bkt-prd-row-std-gds-non-edw-tables/apps/risk/ads/rda/magnes_fraudnet_...,
PartitionFilters: [isnotnull(event_dt#3604), (cast(event_dt#3604 as date) >=
19409), (event_dt#3604 <= 2023-03-23)], PushedFilters:
[Not(EqualTo(device_name,#)), IsNotNull(device_name),
Not(EqualTo(device_name,~)), IsNotNull(cust..., ReadSchema:
struct<customer_id:string,device_name:string>
ERROR
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
execute, tree:
ShuffleQueryStage 6
+- Exchange hashpartitioning(cust_id#13555, Device_Name_Country#13624, 3001),
ENSURE_REQUIREMENTS, [id=#23666]
+- *(5) HashAggregate(keys=[cust_id#13555, Device_Name_Country#13624],
functions=[], output=[cust_id#13555, Device_Name_Country#13624])
+- CustomShuffleReader coalesced
+- ShuffleQueryStage 3
+- Exchange hashpartitioning(cust_id#13555, devc_name_cast#13554,
prmry_reside_cntry_code#13561, 3001), ENSURE_REQUIREMENTS, [id=#22991]
+- *(3) HashAggregate(keys=[cust_id#13555, devc_name_cast#13554,
prmry_reside_cntry_code#13561], functions=[], output=[cust_id#13555,
devc_name_cast#13554, prmry_reside_cntry_code#13561])
+- *(3) Project [cust_id#13555, device_name#11949 AS devc_name_cast#13554,
prmry_reside_cntry_code#13561]
+- *(3) BroadcastHashJoin [cust_id#13555], [customer_id#11927], Inner,
BuildLeft, isnotnull(concat(concat(lower(regexp_replace(device_name#11949, â,
a, 1)), _), prmry_reside_cntry_code#13561)), false
:- BroadcastQueryStage 0
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
false]),false), [id=#22650]
: +- *(1) Filter isnotnull(cust_id#13555)
: +- Scan hive unified_group_review_cri_group.crs_recent_30d_sf_dim_cust_info
[cust_id#13555, prmry_reside_cntry_code#13561], HiveTableRelation
[`unified_group_review_cri_group`.`crs_recent_30d_sf_dim_cust_info`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [cust_id#13555,
acct_cre_dt#13556, is_guest_y_n#13557, prmry_email_domain#13558,
cust_first_name#..., Partition Cols: [dt#13573], Pruned Partitions:
[(dt=2023-03-23)]], [isnotnull(dt#13573), (dt#13573 = 2023-03-23)]
+- *(3) Project [customer_id#11927, device_name#11949]
+- *(3) Filter (((NOT (device_name#11949 = #) AND isnotnull(device_name#11949))
AND NOT (device_name#11949 = ~)) AND isnotnull(customer_id#11927))
+- *(3) ColumnarToRow
+- FileScan parquet
pp_risk_ops_qh_tables.magnes_fraudnet_login_raw[customer_id#11927,device_name#11949,ts#12099,event_dt#12100]
Batched: true, DataFilters: [NOT (device_name#11949 = #),
isnotnull(device_name#11949), NOT (device_name#11949 = ~), isnotnul..., Format:
Parquet, Location:
InMemoryFileIndex[gs://pypl-bkt-prd-row-std-gds-non-edw-tables/apps/risk/ads/rda/magnes_fraudnet_...,
PartitionFilters: [isnotnull(event_dt#12100), (cast(event_dt#12100 as date) >=
19409), (event_dt#12100 <= 2023-03-23)], PushedFilters:
[Not(EqualTo(device_name,#)), IsNotNull(device_name),
Not(EqualTo(device_name,~)), IsNotNull(cust..., ReadSchema:
struct<customer_id:string,device_name:string>
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at
org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:162)
at
org.apache.spark.sql.execution.adaptive.QueryStageExec.$anonfun$materialize$1(QueryStageExec.scala:80)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at
org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:80)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$4(AdaptiveSparkPlanExec.scala:196)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$4$adapted(AdaptiveSparkPlanExec.scala:194)
at scala.collection.immutable.List.foreach(List.scala:431)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:194)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:180)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.doExecute(AdaptiveSparkPlanExec.scala:296)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.SortExec.doExecute(SortExec.scala:112)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:184)
... 42 more
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
Binding attribute, tree: Device_Name_Country#13624
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:75)
at
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:74)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:318)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:318)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:307)
at
org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:74)
at
org.apache.spark.sql.catalyst.expressions.BindReferences$.$anonfun$bindReferences$1(BoundAttribute.scala:96)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at
org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReferences(BoundAttribute.scala:96)
at
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doConsumeWithKeys(HashAggregateExec.scala:828)
at
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doConsume(HashAggregateExec.scala:156)
at
org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction(WholeStageCodegenExec.scala:221)
at
org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:192)
at
org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:149)
at
org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:496)
at
org.apache.spark.sql.execution.InputRDDCodegen.doProduce(WholeStageCodegenExec.scala:483)
at
org.apache.spark.sql.execution.InputRDDCodegen.doProduce$(WholeStageCodegenExec.scala:456)
at
org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:496)
at
org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at
org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
at
org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
at
org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:496)
at
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:733)
at
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:148)
at
org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at
org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
at
org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
at
org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:47)
at
org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:655)
at
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:718)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:118)
at
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:118)
at
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:122)
at
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:121)
at
org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.$anonfun$doMaterialize$1(QueryStageExec.scala:162)
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 67 more
Caused by: java.lang.RuntimeException: Couldn't find Device_Name_Country#13624
in [cust_id#13555,devc_name_cast#13554,prmry_reside_cntry_code#13561]
at scala.sys.package$.error(package.scala:30)
at
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.$anonfun$applyOrElse$1(BoundAttribute.scala:81)
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 121 more
Best Regards
Kelly Zhang