Hi Dhiren, Seems like the resources are not your limit. I doubt that there might be some data skew in your data, you can find this by monitoring the spark job running status. If there’s data skew, there’re several ways to fix it, you can also google for it.
Thanks Lionel, Liu From: Dhiren Sangani Sent: 2018年12月4日 1:20 To: Lionel Liu; [email protected] Cc: [email protected] Subject: RE: Accuracy measure fails on large dataset Hi Lionel, Thanks for the reply. Below are the parameters being used while submitting the job. "numExecutors": 10, "executorCores": 6, "driverMemory": "10g", "executorMemory": "7g" I have 2 Data nodes configured with Yarn each with 80G memory. Each server has 128G physical RAM. I don’t have hourly level partition in my data. It’s day level only. I tried removing extra columns from both the table (source, target) using spark-sql rules and after that job got succeeded. There are 23 columns in table and both source and target has same number of columns. But If I use data with all the columns, it’s failing. Thanks, Dhiren From: Lionel Liu <[email protected]> Sent: Monday, December 3, 2018 1:35 AM To: [email protected]; Dhiren Sangani <[email protected]> Cc: [email protected] Subject: Re: Accuracy measure fails on large dataset Hi Dhiren, How many resources are your using when submit this job? For large scale data, the simple solution is to use more resources for the calculation job. For limited resources, a common solution is to partition your data by date or hour, to make it smaller in each partition, then you can calculate the partial data each time. Thanks, Lionel On Mon, Dec 3, 2018 at 3:33 PM Dhiren Sangani <[email protected]<mailto:[email protected]>> wrote: Hi Dev Team, I am facing issue with Accuracy measure while running job on larger dataset (1B records). I have source and target tables in Hive and using simple accuracy rule as below: Source.Column1 = target.column1 AND source.column2 = target.column2 Getting below exception while running the job. 18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails java.util.concurrent.TimeoutException: Futures timed out after [3600 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140) at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doExecute(BroadcastNestedLoopJoinExec.scala:343) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371) at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371) at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.MapPartitionsExec.doExecute(objects.scala:185) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371) at org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:110) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294) at org.apache.spark.sql.Dataset.org<http://org.apache.spark.sql.Dataset.org>$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278) at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727) at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727) at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258) at org.apache.spark.sql.Dataset.collect(Dataset.scala:2727) at org.apache.griffin.measure.step.write.MetricWriteStep.getMetricMaps(MetricWriteStep.scala:86) at org.apache.griffin.measure.step.write.MetricWriteStep.execute(MetricWriteStep.scala:46) at org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:37) at org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:36) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.griffin.measure.step.SeqDQStep.execute(SeqDQStep.scala:36) at org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:31) at org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:30) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.griffin.measure.job.DQJob.execute(DQJob.scala:30) at org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply$mcZ$sp(BatchDQApp.scala:103) at org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67) at org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67) at scala.util.Try$.apply(Try.scala:192) at org.apache.griffin.measure.launch.batch.BatchDQApp.run(BatchDQApp.scala:67) at org.apache.griffin.measure.Application$.main(Application.scala:88) at org.apache.griffin.measure.Application.main(Application.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721) 18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List() I tried to run Job with half of the data sets (500M) and it run successfully. Below is the log of the metric. 18/11/30 20:37:31 INFO MetricWriteStep: metricMaps => WrappedArray(Map(total -> 508880897, miss -> 0, matched -> 508880897, matchedFraction -> 1.0)) But if I try to run it even with 700M records, it fails. 18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails 18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List() Is it something related to Join query being used inside AccuracyExpr2DQSteps.scala to calculate miss records? Any pointers will be appreciated. Thanks, Dhiren
