Hi Dev Team,

I am facing issue with Accuracy measure while running job on larger dataset (1B 
records).
I have source and target tables in Hive and using simple accuracy rule as below:

Source.Column1 = target.column1 AND source.column2 = target.column2

Getting below exception while running the job.

18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails
java.util.concurrent.TimeoutException: Futures timed out after [3600 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
        at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at 
org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
        at 
org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doExecute(BroadcastNestedLoopJoinExec.scala:343)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
        at 
org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
       at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
        at 
org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.MapPartitionsExec.doExecute(objects.scala:185)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
        at 
org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:110)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
        at 
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294)
        at 
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
        at 
org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727)
        at 
org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727)
        at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
        at org.apache.spark.sql.Dataset.collect(Dataset.scala:2727)
        at 
org.apache.griffin.measure.step.write.MetricWriteStep.getMetricMaps(MetricWriteStep.scala:86)
        at 
org.apache.griffin.measure.step.write.MetricWriteStep.execute(MetricWriteStep.scala:46)
        at 
org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:37)
        at 
org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:36)
        at 
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
        at scala.collection.immutable.List.foldLeft(List.scala:84)
        at org.apache.griffin.measure.step.SeqDQStep.execute(SeqDQStep.scala:36)
        at 
org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:31)
        at 
org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:30)
        at 
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
        at scala.collection.immutable.List.foldLeft(List.scala:84)
        at org.apache.griffin.measure.job.DQJob.execute(DQJob.scala:30)
        at 
org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply$mcZ$sp(BatchDQApp.scala:103)
        at 
org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67)
        at 
org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67)
        at scala.util.Try$.apply(Try.scala:192)
        at 
org.apache.griffin.measure.launch.batch.BatchDQApp.run(BatchDQApp.scala:67)
        at org.apache.griffin.measure.Application$.main(Application.scala:88)
        at org.apache.griffin.measure.Application.main(Application.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List()

I tried to run Job with half of the data sets (500M) and it run successfully.
Below is the log of the metric.
18/11/30 20:37:31 INFO MetricWriteStep: metricMaps => WrappedArray(Map(total -> 
508880897, miss -> 0, matched -> 508880897, matchedFraction -> 1.0))

But if I try to run it even with 700M records, it fails.
18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails
18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List()

Is it something related to Join query being used inside 
AccuracyExpr2DQSteps.scala to calculate miss records?

Any pointers will be appreciated.

Thanks,
Dhiren

Reply via email to