Hi Dhiren, Based on the advisement from Nick, you can try to adjust the property " spark.broadcast.blockSize" in spark.config field in env.json.
Thanks, Lionel On Wed, Dec 5, 2018 at 4:07 AM Nick Sokolov <[email protected]> wrote: > Error looks like Spark is trying to do broadcast join, but data size for > broadcast is too large. It makes sense to try adjusting spark properties in > Griffin to disable auto broadcast join or adjusting broadcast join > threshold. > > On Mon, Dec 3, 2018, 9:20 AM Dhiren Sangani <[email protected]> > wrote: > >> Hi Lionel, >> >> Thanks for the reply. >> >> Below are the parameters being used while submitting the job. >> >> "numExecutors": 10, >> "executorCores": 6, >> "driverMemory": "10g", >> "executorMemory": "7g" >> >> I have 2 Data nodes configured with Yarn each with 80G memory. Each >> server has 128G physical RAM. >> I don’t have hourly level partition in my data. It’s day level only. >> >> I tried removing extra columns from both the table (source, target) using >> spark-sql rules and after that job got succeeded. >> There are 23 columns in table and both source and target has same number >> of columns. >> >> But If I use data with all the columns, it’s failing. >> >> Thanks, >> Dhiren >> >> From: Lionel Liu <[email protected]> >> Sent: Monday, December 3, 2018 1:35 AM >> To: [email protected]; Dhiren Sangani <[email protected]> >> Cc: [email protected] >> Subject: Re: Accuracy measure fails on large dataset >> >> Hi Dhiren, >> >> How many resources are your using when submit this job? >> For large scale data, the simple solution is to use more resources for >> the calculation job. >> For limited resources, a common solution is to partition your data by >> date or hour, to make it smaller in each partition, then you can calculate >> the partial data each time. >> >> Thanks, >> Lionel >> >> >> >> >> On Mon, Dec 3, 2018 at 3:33 PM Dhiren Sangani <[email protected] >> <mailto:[email protected]>> wrote: >> Hi Dev Team, >> >> I am facing issue with Accuracy measure while running job on larger >> dataset (1B records). >> I have source and target tables in Hive and using simple accuracy rule as >> below: >> >> Source.Column1 = target.column1 AND source.column2 = target.column2 >> >> Getting below exception while running the job. >> >> 18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails >> java.util.concurrent.TimeoutException: Futures timed out after [3600 >> seconds] >> at >> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) >> at >> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) >> at >> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201) >> at >> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >> at >> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) >> at >> org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140) >> at >> org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doExecute(BroadcastNestedLoopJoinExec.scala:343) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >> at >> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) >> at >> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) >> at >> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371) >> at >> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) >> at >> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >> at >> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) >> at >> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) >> at >> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371) >> at >> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) >> at >> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >> at >> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) >> at >> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) >> at >> org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >> at >> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) >> at >> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) >> at >> org.apache.spark.sql.execution.MapPartitionsExec.doExecute(objects.scala:185) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >> at >> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) >> at >> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) >> at >> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371) >> at >> org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:110) >> at >> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >> at >> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) >> at >> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) >> at >> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) >> at >> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294) >> at org.apache.spark.sql.Dataset.org< >> http://org.apache.spark.sql.Dataset.org >> >$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278) >> at >> org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727) >> at >> org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727) >> at >> org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259) >> at >> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) >> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258) >> at org.apache.spark.sql.Dataset.collect(Dataset.scala:2727) >> at >> org.apache.griffin.measure.step.write.MetricWriteStep.getMetricMaps(MetricWriteStep.scala:86) >> at >> org.apache.griffin.measure.step.write.MetricWriteStep.execute(MetricWriteStep.scala:46) >> at >> org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:37) >> at >> org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:36) >> at >> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) >> at scala.collection.immutable.List.foldLeft(List.scala:84) >> at >> org.apache.griffin.measure.step.SeqDQStep.execute(SeqDQStep.scala:36) >> at >> org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:31) >> at >> org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:30) >> at >> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) >> at scala.collection.immutable.List.foldLeft(List.scala:84) >> at org.apache.griffin.measure.job.DQJob.execute(DQJob.scala:30) >> at >> org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply$mcZ$sp(BatchDQApp.scala:103) >> at >> org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67) >> at >> org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67) >> at scala.util.Try$.apply(Try.scala:192) >> at >> org.apache.griffin.measure.launch.batch.BatchDQApp.run(BatchDQApp.scala:67) >> at >> org.apache.griffin.measure.Application$.main(Application.scala:88) >> at org.apache.griffin.measure.Application.main(Application.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721) >> 18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List() >> >> I tried to run Job with half of the data sets (500M) and it run >> successfully. >> Below is the log of the metric. >> 18/11/30 20:37:31 INFO MetricWriteStep: metricMaps => >> WrappedArray(Map(total -> 508880897, miss -> 0, matched -> 508880897, >> matchedFraction -> 1.0)) >> >> But if I try to run it even with 700M records, it fails. >> 18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails >> 18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List() >> >> Is it something related to Join query being used inside >> AccuracyExpr2DQSteps.scala to calculate miss records? >> >> Any pointers will be appreciated. >> >> Thanks, >> Dhiren >> >
