Hi Dhiren,

Based on the advisement from Nick, you can try to adjust the property "
spark.broadcast.blockSize" in spark.config field in env.json.

Thanks,
Lionel

On Wed, Dec 5, 2018 at 4:07 AM Nick Sokolov <[email protected]> wrote:

> Error looks like Spark is trying to do broadcast join, but data size for
> broadcast is too large. It makes sense to try adjusting spark properties in
> Griffin to disable auto broadcast join or adjusting broadcast join
> threshold.
>
> On Mon, Dec 3, 2018, 9:20 AM Dhiren Sangani <[email protected]>
> wrote:
>
>> Hi Lionel,
>>
>> Thanks for the reply.
>>
>> Below are the parameters being used while submitting the job.
>>
>> "numExecutors": 10,
>> "executorCores": 6,
>> "driverMemory": "10g",
>> "executorMemory": "7g"
>>
>> I have 2 Data nodes configured with Yarn each with 80G memory. Each
>> server has 128G physical RAM.
>> I don’t have hourly level partition in my data. It’s day level only.
>>
>> I tried removing extra columns from both the table (source, target) using
>> spark-sql rules and after that job got succeeded.
>> There are 23 columns in table and both source and target has same number
>> of columns.
>>
>> But If I use data with all the columns, it’s failing.
>>
>> Thanks,
>> Dhiren
>>
>> From: Lionel Liu <[email protected]>
>> Sent: Monday, December 3, 2018 1:35 AM
>> To: [email protected]; Dhiren Sangani <[email protected]>
>> Cc: [email protected]
>> Subject: Re: Accuracy measure fails on large dataset
>>
>> Hi Dhiren,
>>
>> How many resources are your using when submit this job?
>> For large scale data, the simple solution is to use more resources for
>> the calculation job.
>> For limited resources, a common solution is to partition your data by
>> date or hour, to make it smaller in each partition, then you can calculate
>> the partial data each time.
>>
>> Thanks,
>> Lionel
>>
>>
>>
>>
>> On Mon, Dec 3, 2018 at 3:33 PM Dhiren Sangani <[email protected]
>> <mailto:[email protected]>> wrote:
>> Hi Dev Team,
>>
>> I am facing issue with Accuracy measure while running job on larger
>> dataset (1B records).
>> I have source and target tables in Hive and using simple accuracy rule as
>> below:
>>
>> Source.Column1 = target.column1 AND source.column2 = target.column2
>>
>> Getting below exception while running the job.
>>
>> 18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails
>> java.util.concurrent.TimeoutException: Futures timed out after [3600
>> seconds]
>>         at
>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>         at
>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>         at
>> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
>>         at
>> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>>         at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
>>         at
>> org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doExecute(BroadcastNestedLoopJoinExec.scala:343)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>>         at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>         at
>> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
>>         at
>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>         at
>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>>        at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>         at
>> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
>>         at
>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>         at
>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>>         at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>         at
>> org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>>         at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>         at
>> org.apache.spark.sql.execution.MapPartitionsExec.doExecute(objects.scala:185)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>>         at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>         at
>> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
>>         at
>> org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:110)
>>         at
>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>>         at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294)
>>         at org.apache.spark.sql.Dataset.org<
>> http://org.apache.spark.sql.Dataset.org
>> >$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
>>         at
>> org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727)
>>         at
>> org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727)
>>         at
>> org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
>>         at
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
>>         at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
>>         at org.apache.spark.sql.Dataset.collect(Dataset.scala:2727)
>>         at
>> org.apache.griffin.measure.step.write.MetricWriteStep.getMetricMaps(MetricWriteStep.scala:86)
>>         at
>> org.apache.griffin.measure.step.write.MetricWriteStep.execute(MetricWriteStep.scala:46)
>>         at
>> org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:37)
>>         at
>> org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:36)
>>         at
>> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>>         at scala.collection.immutable.List.foldLeft(List.scala:84)
>>         at
>> org.apache.griffin.measure.step.SeqDQStep.execute(SeqDQStep.scala:36)
>>         at
>> org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:31)
>>         at
>> org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:30)
>>         at
>> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>>         at scala.collection.immutable.List.foldLeft(List.scala:84)
>>         at org.apache.griffin.measure.job.DQJob.execute(DQJob.scala:30)
>>         at
>> org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply$mcZ$sp(BatchDQApp.scala:103)
>>         at
>> org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67)
>>         at
>> org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67)
>>         at scala.util.Try$.apply(Try.scala:192)
>>         at
>> org.apache.griffin.measure.launch.batch.BatchDQApp.run(BatchDQApp.scala:67)
>>         at
>> org.apache.griffin.measure.Application$.main(Application.scala:88)
>>         at org.apache.griffin.measure.Application.main(Application.scala)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>         at
>> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
>> 18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List()
>>
>> I tried to run Job with half of the data sets (500M) and it run
>> successfully.
>> Below is the log of the metric.
>> 18/11/30 20:37:31 INFO MetricWriteStep: metricMaps =>
>> WrappedArray(Map(total -> 508880897, miss -> 0, matched -> 508880897,
>> matchedFraction -> 1.0))
>>
>> But if I try to run it even with 700M records, it fails.
>> 18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails
>> 18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List()
>>
>> Is it something related to Join query being used inside
>> AccuracyExpr2DQSteps.scala to calculate miss records?
>>
>> Any pointers will be appreciated.
>>
>> Thanks,
>> Dhiren
>>
>

Reply via email to