Hi forks,
I have some issue when using Hudi to dedup, seems it doesn't work. Below is
the command I tried with Spark:
spark-submit --master "spark://XXX.XXX.XXX.39:6066" \
--deploy-mode cluster \
--conf spark.sql.warehouse.dir=s3a://vungle2-dataeng/temp/dw/ \
--conf spark.eventLog.enabled=false \
--conf spark.hadoop.fs.s3a.secret.key=XXXXXXXX \
--conf spark.hadoop.fs.s3a.access.key=XXXXXXXX \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--total-executor-cores 6 --executor-memory 1g \
--jars
s3a://vungle2-dataeng/hudi-demo/jack-test/config/hoodie-client-0.4.6-SNAPSHOT.jar,s3a://vungle2-dataeng/hudi-demo/jack-test/config/hoodie-common-0.4.6-SNAPSHOT.jar
--class com.uber.hoodie.cli.commands.SparkMain \
s3a://vungle2-dataeng/hudi-demo/jack-test/config/hoodie-cli-0.4.6-SNAPSHOT.jar
DEDUPLICATE 2019-04-22_07/
s3a://vungle2-dataeng/temp/stage20190422/2019-04-22_07
s3a://vungle2-dataeng/jun-test/stage20190422
It always throws exception like below, from the error message, I checked
the parquet files on partition 2019-04-22_07, and I found there are 43
files, but Hudi CLI cannot enumerate files, and just returns empty, this
results in failure on spark sql used to find duplicated record keys.
Lastly, I located the code which rises the exception:
[image: Screen Shot 2019-04-24 at 6.48.49 PM.png]
The highlighted line returns empty file list. Could anyone help to explain
the potential reason? Thanks very much.
Below is the exception message for diagnose:
=============================================================================
19/04/24 09:50:11 INFO DedupeSparkJob: List of files under partition: ()
=>
19/04/24 09:50:11 INFO CoarseGrainedSchedulerBackend$DriverEndpoint:
Registered executor NettyRpcEndpointRef(spark-client://Executor) (
172.19.100.10:57930) with ID 4
19/04/24 09:50:11 INFO BlockManagerMasterEndpoint: Registering block
manager ip-172-19-106-60:46877 with 366.3 MB RAM, BlockManagerId(5,
ip-172-19-106-60, 46877, None)
19/04/24 09:50:11 INFO BlockManagerMasterEndpoint: Registering block
manager ip-172-19-100-10:32772 with 366.3 MB RAM, BlockManagerId(4,
ip-172-19-100-10, 32772, None)
19/04/24 09:50:11 INFO CoarseGrainedSchedulerBackend$DriverEndpoint:
Registered executor NettyRpcEndpointRef(spark-client://Executor) (
172.19.104.28:46117) with ID 0
19/04/24 09:50:11 INFO CoarseGrainedSchedulerBackend$DriverEndpoint:
Registered executor NettyRpcEndpointRef(spark-client://Executor) (
172.19.102.90:32967) with ID 2
19/04/24 09:50:11 INFO BlockManagerMasterEndpoint: Registering block
manager ip-172-19-104-28:33312 with 366.3 MB RAM, BlockManagerId(0,
ip-172-19-104-28, 33312, None)
19/04/24 09:50:11 INFO CoarseGrainedSchedulerBackend$DriverEndpoint:
Registered executor NettyRpcEndpointRef(spark-client://Executor) (
172.19.111.216:58010) with ID 1
19/04/24 09:50:11 INFO BlockManagerMasterEndpoint: Registering block
manager ip-172-19-102-90:35450 with 366.3 MB RAM, BlockManagerId(2,
ip-172-19-102-90, 35450, None)
19/04/24 09:50:11 INFO BlockManagerMasterEndpoint: Registering block
manager ip-172-19-111-216:34525 with 366.3 MB RAM, BlockManagerId(1,
ip-172-19-111-216, 34525, None)
19/04/24 09:50:12 INFO SharedState: Setting hive.metastore.warehouse.dir
('null') to the value of spark.sql.warehouse.dir ('s3
a://vungle2-dataeng/temp/dw/').
19/04/24 09:50:12 INFO SharedState: Warehouse path is 's3
a://vungle2-dataeng/temp/dw/'.
19/04/24 09:50:13 INFO StateStoreCoordinatorRef: Registered
StateStoreCoordinator endpoint
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:65)
at
org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: org.apache.spark.sql.AnalysisException: cannot resolve
'`_hoodie_record_key`' given input columns: []; line 5 pos 15;
'Filter ('dupe_cnt > 1)
+- 'Aggregate ['_hoodie_record_key], ['_hoodie_record_key AS dupe_key#0,
count(1) AS dupe_cnt#1L]
+- SubqueryAlias htbl_1556099410774
+- LogicalRDD false
at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:88)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$recursiveTransform$1(QueryPlan.scala:118
)
at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:122)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$recursiveTransform$1(QueryPlan.scala:122
)
at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
at
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
at
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:694)
at
com.uber.hoodie.cli.DedupeSparkJob.getDupeKeyDF(DedupeSparkJob.scala:62)
at
com.uber.hoodie.cli.DedupeSparkJob.planDuplicateFix(DedupeSparkJob.scala:93)
at
com.uber.hoodie.cli.DedupeSparkJob.fixDuplicates(DedupeSparkJob.scala:142)
at
com.uber.hoodie.cli.commands.SparkMain.deduplicatePartitionPath(SparkMain.java:217
)
at com.uber.hoodie.cli.commands.SparkMain.main(SparkMain.java:62)
... 6 more
--
[image: vshapesaqua11553186012.gif] <https://vungle.com/> *Jianbin Wang*
Sr. Engineer II, Data
+86 18633600964
[image: in1552694272.png] <https://www.linkedin.com/company/vungle> [image:
fb1552694203.png] <https://facebook.com/vungle> [image:
tw1552694330.png] <https://twitter.com/vungle> [image:
ig1552694392.png] <https://www.instagram.com/vungle>
Units 3801, 3804, 38F, C Block, Beijing Yintai Center, Beijing, China