Hi Dhiren,

Seems like the resources are not your limit. 
I doubt that there might be some data skew in your data, you can find this by 
monitoring the spark job running status.
If there’s data skew, there’re several ways to fix it, you can also google for 
it.

Thanks
Lionel, Liu

From: Dhiren Sangani
Sent: 2018年12月4日 1:20
To: Lionel Liu; [email protected]
Cc: [email protected]
Subject: RE: Accuracy measure fails on large dataset

Hi Lionel,

Thanks for the reply.

Below are the parameters being used while submitting the job.

"numExecutors": 10,
"executorCores": 6,
"driverMemory": "10g",
"executorMemory": "7g"

I have 2 Data nodes configured with Yarn each with 80G memory. Each server has 
128G physical RAM.
I don’t have hourly level partition in my data. It’s day level only.

I tried removing extra columns from both the table (source, target) using 
spark-sql rules and after that job got succeeded.
There are 23 columns in table and both source and target has same number of 
columns.

But If I use data with all the columns, it’s failing.

Thanks,
Dhiren

From: Lionel Liu <[email protected]>
Sent: Monday, December 3, 2018 1:35 AM
To: [email protected]; Dhiren Sangani <[email protected]>
Cc: [email protected]
Subject: Re: Accuracy measure fails on large dataset

Hi Dhiren,

How many resources are your using when submit this job?
For large scale data, the simple solution is to use more resources for the 
calculation job.
For limited resources, a common solution is to partition your data by date or 
hour, to make it smaller in each partition, then you can calculate the partial 
data each time.

Thanks,
Lionel




On Mon, Dec 3, 2018 at 3:33 PM Dhiren Sangani 
<[email protected]<mailto:[email protected]>> wrote:
Hi Dev Team,

I am facing issue with Accuracy measure while running job on larger dataset (1B 
records).
I have source and target tables in Hive and using simple accuracy rule as below:

Source.Column1 = target.column1 AND source.column2 = target.column2

Getting below exception while running the job.

18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails
java.util.concurrent.TimeoutException: Futures timed out after [3600 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
        at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at 
org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
        at 
org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doExecute(BroadcastNestedLoopJoinExec.scala:343)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
        at 
org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
       at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
        at 
org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.MapPartitionsExec.doExecute(objects.scala:185)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
        at 
org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:110)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
        at 
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294)
        at 
org.apache.spark.sql.Dataset.org<http://org.apache.spark.sql.Dataset.org>$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
        at 
org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727)
        at 
org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727)
        at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
        at org.apache.spark.sql.Dataset.collect(Dataset.scala:2727)
        at 
org.apache.griffin.measure.step.write.MetricWriteStep.getMetricMaps(MetricWriteStep.scala:86)
        at 
org.apache.griffin.measure.step.write.MetricWriteStep.execute(MetricWriteStep.scala:46)
        at 
org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:37)
        at 
org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:36)
        at 
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
        at scala.collection.immutable.List.foldLeft(List.scala:84)
        at org.apache.griffin.measure.step.SeqDQStep.execute(SeqDQStep.scala:36)
        at 
org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:31)
        at 
org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:30)
        at 
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
        at scala.collection.immutable.List.foldLeft(List.scala:84)
        at org.apache.griffin.measure.job.DQJob.execute(DQJob.scala:30)
        at 
org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply$mcZ$sp(BatchDQApp.scala:103)
        at 
org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67)
        at 
org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67)
        at scala.util.Try$.apply(Try.scala:192)
        at 
org.apache.griffin.measure.launch.batch.BatchDQApp.run(BatchDQApp.scala:67)
        at org.apache.griffin.measure.Application$.main(Application.scala:88)
        at org.apache.griffin.measure.Application.main(Application.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List()

I tried to run Job with half of the data sets (500M) and it run successfully.
Below is the log of the metric.
18/11/30 20:37:31 INFO MetricWriteStep: metricMaps => WrappedArray(Map(total -> 
508880897, miss -> 0, matched -> 508880897, matchedFraction -> 1.0))

But if I try to run it even with 700M records, it fails.
18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails
18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List()

Is it something related to Join query being used inside 
AccuracyExpr2DQSteps.scala to calculate miss records?

Any pointers will be appreciated.

Thanks,
Dhiren

Reply via email to