ehurheap commented on issue #6194:
URL: https://github.com/apache/hudi/issues/6194#issuecomment-1213323341

   sorry for the confusion @nsivabalan. I reviewed the commands that you 
specified to verify they were the same as what I tried. The main differences 
between what you did and our situation:
   
   - Our hudi table was loaded by 2 separate processes, one bulk_insert, one 
streaming ingest job
   - In addition our hudi table is MOR.
   
   I can run a spark query similar to yours and verify there are duplicates in 
the given partition. 
   
   ```
     val 
dupePath="s3://heap-datalake-storage/data/tables/events/env_id=123456789/week=20220711"
   
     val ddF = spark.read.parquet(dupePath)
     ddF.createOrReplaceTempView("hoh")
   
     val dupQ = s"""
          |select _hoodie_partition_path, _hoodie_record_key, count(*)
          | from hoh group by _hoodie_partition_path, _hoodie_record_key
          | order by count(*) desc
          |""".stripMargin
     spark.sql(dupQ).show(false)
   ```
   sample output:
   ```
   
     
+------------------------------+-------------------------------------------------------------------+--------+
     |_hoodie_partition_path        |_hoodie_record_key                         
                        |count(1)|
     
+------------------------------+-------------------------------------------------------------------+--------+
     
|env_id=123456789/week=20220711|env_id:123456789,user_id:7806358957060773,event_id:5758152328327473|3
       |
     
|env_id=123456789/week=20220711|env_id:123456789,user_id:5403332495352077,event_id:3536309858058402|2
       |
     
|env_id=123456789/week=20220711|env_id:123456789,user_id:4713648045477470,event_id:8717656941318904|2
       |
     
|env_id=123456789/week=20220711|env_id:123456789,user_id:2025910439767252,event_id:8549159234261693|2
       |
     
|env_id=123456789/week=20220711|env_id:123456789,user_id:7507696929673571,event_id:3179806702204642|2
       |
     
|env_id=123456789/week=20220711|env_id:123456789,user_id:7301684312119961,event_id:717438862368076
 |2       |
   ```
   
   But when I run this hudi-cli command:
   
   `hudi:events->repair deduplicate --duplicatedPartitionPath 
"env_id=123/week=20220711" --repairedOutputPath /tmp/hhdeduplicates 
--sparkMaster local[2] --sparkMemory 4G --dryrun true --dedupeType 
"upsert_type"`
   
   The output I get is:
   
   ```
   22/08/12 16:27:21 ERROR SparkMain: Fail to execute commandString
   org.apache.spark.sql.AnalysisException: cannot resolve '_hoodie_record_key' 
given input columns: []; line 5 pos 15;
   'UnresolvedHaving ('dupe_cnt > 1)
   +- 'Aggregate ['_hoodie_record_key], ['_hoodie_record_key AS dupe_key#0, 
count(1) AS dupe_cnt#1L]
      +- SubqueryAlias htbl_1660321638341
         +- View (`htbl_1660321638341`, [])
            +- LocalRelation <empty>
   hudi:events->
   hudi:eveat 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:54)
   hudi:eveat 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:179)
   hudi:eveat 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:175)
   hudi:eveat 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:573)
   hudi:eveat 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83)
   hudi:eveat 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:573)
   hudi:eveat 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUpWithPruning$1(QueryPlan.scala:181)
   hudi:eveat 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:193)
   hudi:eveat 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83)
   hudi:eveat 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:193)
   hudi:eveat 
org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:204)
   hudi:eveat 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:209)
   hudi:eveat 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
   ...
   
   Deduplication failed!
   ```
   
   There are no files in the `--repairedOutputPath ` location. I understand 
that if there were data there, we would use that deduplicated data to replace 
what is currently in hudi: first deleting the duplicates from hudi, then load 
the deduped data from the repairedOutputPath location.
   
   But since we have no repaired deduplicated data, we are stuck. Does that 
make sense? Have I missed something ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to