psendyk commented on code in PR #8627:
URL: https://github.com/apache/hudi/pull/8627#discussion_r1184277562
##########
hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala:
##########
@@ -100,81 +106,31 @@ class DedupeSparkJob(basePath: String,
getDedupePlan(dupeMap)
}
- private def getDedupePlan(dupeMap: Map[String, Buffer[Row]]):
HashMap[String, HashSet[String]] = {
- val fileToDeleteKeyMap = new HashMap[String, HashSet[String]]()
+ private def getDedupePlan(
+ dupeMap: Map[String, Buffer[Row]]
+ ): HashMap[String, HashMap[String, Int]] = {
+ val fileToDeleteKeyMap = new HashMap[String, HashMap[String, Int]]()
dupeMap.foreach(rt => {
val (key, rows) = rt
dedupeType match {
- case DeDupeType.UPDATE_TYPE =>
- /*
- This corresponds to the case where all duplicates have been updated
at least once.
- Once updated, duplicates are bound to have same commit time unless
forcefully modified.
- */
- rows.init.foreach(r => {
- val f = r(2).asInstanceOf[String].split("_")(0)
- if (!fileToDeleteKeyMap.contains(f)) {
- fileToDeleteKeyMap(f) = HashSet[String]()
- }
- fileToDeleteKeyMap(f).add(key)
- })
-
case DeDupeType.INSERT_TYPE =>
- /*
- This corresponds to the case where duplicates got created due to
INSERT and have never been updated.
- */
- var maxCommit = -1L
-
- rows.foreach(r => {
- val c = r(3).asInstanceOf[String].toLong
- if (c > maxCommit)
- maxCommit = c
- })
-
- rows.foreach(r => {
- val c = r(3).asInstanceOf[String].toLong
- if (c != maxCommit) {
- val f = r(2).asInstanceOf[String].split("_")(0)
- if (!fileToDeleteKeyMap.contains(f)) {
- fileToDeleteKeyMap(f) = HashSet[String]()
- }
- fileToDeleteKeyMap(f).add(key)
- }
- })
-
- case DeDupeType.UPSERT_TYPE =>
- /*
- This corresponds to the case where duplicates got created as a
result of inserts as well as updates,
- i.e few duplicate records have been updated, while others were never
updated.
- */
- var maxCommit = -1L
-
- rows.foreach(r => {
- val c = r(3).asInstanceOf[String].toLong
- if (c > maxCommit)
- maxCommit = c
- })
- val rowsWithMaxCommit = new ListBuffer[Row]()
- rows.foreach(r => {
- val c = r(3).asInstanceOf[String].toLong
- if (c != maxCommit) {
- val f = r(2).asInstanceOf[String].split("_")(0)
- if (!fileToDeleteKeyMap.contains(f)) {
- fileToDeleteKeyMap(f) = HashSet[String]()
+ val recordsMarkedForDeletion = new HashSet[String]()
+
+ rows
+ .slice(1, rows.length)
Review Comment:
keep the first record instance -- doesn't need to be ordered by commit time
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]