Error looks like Spark is trying to do broadcast join, but data size for broadcast is too large. It makes sense to try adjusting spark properties in Griffin to disable auto broadcast join or adjusting broadcast join threshold.
On Mon, Dec 3, 2018, 9:20 AM Dhiren Sangani <[email protected]> wrote: > Hi Lionel, > > Thanks for the reply. > > Below are the parameters being used while submitting the job. > > "numExecutors": 10, > "executorCores": 6, > "driverMemory": "10g", > "executorMemory": "7g" > > I have 2 Data nodes configured with Yarn each with 80G memory. Each server > has 128G physical RAM. > I don’t have hourly level partition in my data. It’s day level only. > > I tried removing extra columns from both the table (source, target) using > spark-sql rules and after that job got succeeded. > There are 23 columns in table and both source and target has same number > of columns. > > But If I use data with all the columns, it’s failing. > > Thanks, > Dhiren > > From: Lionel Liu <[email protected]> > Sent: Monday, December 3, 2018 1:35 AM > To: [email protected]; Dhiren Sangani <[email protected]> > Cc: [email protected] > Subject: Re: Accuracy measure fails on large dataset > > Hi Dhiren, > > How many resources are your using when submit this job? > For large scale data, the simple solution is to use more resources for the > calculation job. > For limited resources, a common solution is to partition your data by date > or hour, to make it smaller in each partition, then you can calculate the > partial data each time. > > Thanks, > Lionel > > > > > On Mon, Dec 3, 2018 at 3:33 PM Dhiren Sangani <[email protected] > <mailto:[email protected]>> wrote: > Hi Dev Team, > > I am facing issue with Accuracy measure while running job on larger > dataset (1B records). > I have source and target tables in Hive and using simple accuracy rule as > below: > > Source.Column1 = target.column1 AND source.column2 = target.column2 > > Getting below exception while running the job. > > 18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails > java.util.concurrent.TimeoutException: Futures timed out after [3600 > seconds] > at > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) > at > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > at > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201) > at > org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at > org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140) > at > org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doExecute(BroadcastNestedLoopJoinExec.scala:343) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371) > at > org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371) > at > org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.MapPartitionsExec.doExecute(objects.scala:185) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371) > at > org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:110) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) > at > org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294) > at org.apache.spark.sql.Dataset.org< > http://org.apache.spark.sql.Dataset.org > >$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278) > at > org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727) > at > org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727) > at > org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) > at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258) > at org.apache.spark.sql.Dataset.collect(Dataset.scala:2727) > at > org.apache.griffin.measure.step.write.MetricWriteStep.getMetricMaps(MetricWriteStep.scala:86) > at > org.apache.griffin.measure.step.write.MetricWriteStep.execute(MetricWriteStep.scala:46) > at > org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:37) > at > org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:36) > at > scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) > at scala.collection.immutable.List.foldLeft(List.scala:84) > at > org.apache.griffin.measure.step.SeqDQStep.execute(SeqDQStep.scala:36) > at > org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:31) > at > org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:30) > at > scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) > at scala.collection.immutable.List.foldLeft(List.scala:84) > at org.apache.griffin.measure.job.DQJob.execute(DQJob.scala:30) > at > org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply$mcZ$sp(BatchDQApp.scala:103) > at > org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67) > at > org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67) > at scala.util.Try$.apply(Try.scala:192) > at > org.apache.griffin.measure.launch.batch.BatchDQApp.run(BatchDQApp.scala:67) > at > org.apache.griffin.measure.Application$.main(Application.scala:88) > at org.apache.griffin.measure.Application.main(Application.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721) > 18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List() > > I tried to run Job with half of the data sets (500M) and it run > successfully. > Below is the log of the metric. > 18/11/30 20:37:31 INFO MetricWriteStep: metricMaps => > WrappedArray(Map(total -> 508880897, miss -> 0, matched -> 508880897, > matchedFraction -> 1.0)) > > But if I try to run it even with 700M records, it fails. > 18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails > 18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List() > > Is it something related to Join query being used inside > AccuracyExpr2DQSteps.scala to calculate miss records? > > Any pointers will be appreciated. > > Thanks, > Dhiren >
