Hi Jack, Trying to understand what your goal is. DeDupeSparkJob is used for repairing datasets with repairs.. Is this your intention? Mailing list does not show images. :( Can you please post the code here or in a gist? I can take a look.
Thanks Vinoth On Wed, Apr 24, 2019 at 3:57 AM Jack Wang <[email protected]> wrote: > Hi forks, > > I have some issue when using Hudi to dedup, seems it doesn't work. Below > is the command I tried with Spark: > > spark-submit --master "spark://XXX.XXX.XXX.39:6066" \ > --deploy-mode cluster \ > --conf spark.sql.warehouse.dir=s3a://vungle2-dataeng/temp/dw/ \ > --conf spark.eventLog.enabled=false \ > --conf spark.hadoop.fs.s3a.secret.key=XXXXXXXX \ > --conf spark.hadoop.fs.s3a.access.key=XXXXXXXX \ > --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ > --total-executor-cores 6 --executor-memory 1g \ > --jars > s3a://vungle2-dataeng/hudi-demo/jack-test/config/hoodie-client-0.4.6-SNAPSHOT.jar,s3a://vungle2-dataeng/hudi-demo/jack-test/config/hoodie-common-0.4.6-SNAPSHOT.jar > --class com.uber.hoodie.cli.commands.SparkMain \ > s3a://vungle2-dataeng/hudi-demo/jack-test/config/hoodie-cli-0.4.6-SNAPSHOT.jar > DEDUPLICATE 2019-04-22_07/ > s3a://vungle2-dataeng/temp/stage20190422/2019-04-22_07 > s3a://vungle2-dataeng/jun-test/stage20190422 > > It always throws exception like below, from the error message, I checked > the parquet files on partition 2019-04-22_07, and I found there are 43 > files, but Hudi CLI cannot enumerate files, and just returns empty, this > results in failure on spark sql used to find duplicated record keys. > > Lastly, I located the code which rises the exception: > [image: Screen Shot 2019-04-24 at 6.48.49 PM.png] > > The highlighted line returns empty file list. Could anyone help to explain > the potential reason? Thanks very much. > > > Below is the exception message for diagnose: > > ============================================================================= > 19/04/24 09:50:11 INFO DedupeSparkJob: List of files under partition: () > => > 19/04/24 09:50:11 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: > Registered executor NettyRpcEndpointRef(spark-client://Executor) ( > 172.19.100.10:57930) with ID 4 > 19/04/24 09:50:11 INFO BlockManagerMasterEndpoint: Registering block > manager ip-172-19-106-60:46877 with 366.3 MB RAM, BlockManagerId(5, > ip-172-19-106-60, 46877, None) > 19/04/24 09:50:11 INFO BlockManagerMasterEndpoint: Registering block > manager ip-172-19-100-10:32772 with 366.3 MB RAM, BlockManagerId(4, > ip-172-19-100-10, 32772, None) > 19/04/24 09:50:11 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: > Registered executor NettyRpcEndpointRef(spark-client://Executor) ( > 172.19.104.28:46117) with ID 0 > 19/04/24 09:50:11 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: > Registered executor NettyRpcEndpointRef(spark-client://Executor) ( > 172.19.102.90:32967) with ID 2 > 19/04/24 09:50:11 INFO BlockManagerMasterEndpoint: Registering block > manager ip-172-19-104-28:33312 with 366.3 MB RAM, BlockManagerId(0, > ip-172-19-104-28, 33312, None) > 19/04/24 09:50:11 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: > Registered executor NettyRpcEndpointRef(spark-client://Executor) ( > 172.19.111.216:58010) with ID 1 > 19/04/24 09:50:11 INFO BlockManagerMasterEndpoint: Registering block > manager ip-172-19-102-90:35450 with 366.3 MB RAM, BlockManagerId(2, > ip-172-19-102-90, 35450, None) > 19/04/24 09:50:11 INFO BlockManagerMasterEndpoint: Registering block > manager ip-172-19-111-216:34525 with 366.3 MB RAM, BlockManagerId(1, > ip-172-19-111-216, 34525, None) > 19/04/24 09:50:12 INFO SharedState: Setting hive.metastore.warehouse.dir > ('null') to the value of spark.sql.warehouse.dir ('s3 > a://vungle2-dataeng/temp/dw/'). > 19/04/24 09:50:12 INFO SharedState: Warehouse path is 's3 > a://vungle2-dataeng/temp/dw/'. > 19/04/24 09:50:13 INFO StateStoreCoordinatorRef: Registered > StateStoreCoordinator endpoint > Exception in thread "main" java.lang.reflect.InvocationTargetException > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:65) > at > org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) > Caused by: org.apache.spark.sql.AnalysisException: cannot resolve > '`_hoodie_record_key`' given input columns: []; line 5 pos 15; > 'Filter ('dupe_cnt > 1) > +- 'Aggregate ['_hoodie_record_key], ['_hoodie_record_key AS dupe_key#0, > count(1) AS dupe_cnt#1L] > +- SubqueryAlias htbl_1556099410774 > +- LogicalRDD false > > at > org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:88) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$recursiveTransform$1(QueryPlan.scala:118 > ) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:122) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$recursiveTransform$1(QueryPlan.scala:122 > ) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80) > at > org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105) > at > org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57) > at > org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55) > at > org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47) > at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74) > at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641) > at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:694) > at > com.uber.hoodie.cli.DedupeSparkJob.getDupeKeyDF(DedupeSparkJob.scala:62) > at > com.uber.hoodie.cli.DedupeSparkJob.planDuplicateFix(DedupeSparkJob.scala:93) > at > com.uber.hoodie.cli.DedupeSparkJob.fixDuplicates(DedupeSparkJob.scala:142) > at > com.uber.hoodie.cli.commands.SparkMain.deduplicatePartitionPath(SparkMain.java:217 > ) > at com.uber.hoodie.cli.commands.SparkMain.main(SparkMain.java:62) > ... 6 more > > -- > [image: vshapesaqua11553186012.gif] <https://vungle.com/> *Jianbin Wang* > Sr. Engineer II, Data > +86 18633600964 > > [image: in1552694272.png] <https://www.linkedin.com/company/vungle> [image: > fb1552694203.png] <https://facebook.com/vungle> [image: > tw1552694330.png] <https://twitter.com/vungle> [image: > ig1552694392.png] <https://www.instagram.com/vungle> > Units 3801, 3804, 38F, C Block, Beijing Yintai Center, Beijing, China > >
