pratyakshsharma commented on a change in pull request #1558:
URL: https://github.com/apache/incubator-hudi/pull/1558#discussion_r416469107
##########
File path: hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala
##########
@@ -103,24 +105,51 @@ class DedupeSparkJob(basePath: String,
// Mark all files except the one with latest commits for deletion
dupeMap.foreach(rt => {
val (key, rows) = rt
- var maxCommit = -1L
-
- rows.foreach(r => {
- val c = r(3).asInstanceOf[String].toLong
- if (c > maxCommit)
- maxCommit = c
- })
-
- rows.foreach(r => {
- val c = r(3).asInstanceOf[String].toLong
- if (c != maxCommit) {
- val f = r(2).asInstanceOf[String].split("_")(0)
- if (!fileToDeleteKeyMap.contains(f)) {
- fileToDeleteKeyMap(f) = HashSet[String]()
+
+ if (useCommitTimeForDedupe) {
+ /*
+ This corresponds to the case where duplicates got created due to
INSERT and have never been updated.
+ */
+ var maxCommit = -1L
+
+ rows.foreach(r => {
+ val c = r(3).asInstanceOf[String].toLong
+ if (c > maxCommit)
+ maxCommit = c
+ })
+ rows.foreach(r => {
+ val c = r(3).asInstanceOf[String].toLong
+ if (c != maxCommit) {
+ val f = r(2).asInstanceOf[String].split("_")(0)
+ if (!fileToDeleteKeyMap.contains(f)) {
+ fileToDeleteKeyMap(f) = HashSet[String]()
+ }
+ fileToDeleteKeyMap(f).add(key)
}
- fileToDeleteKeyMap(f).add(key)
+ })
+ } else {
+ /*
+ This corresponds to the case where duplicates have been updated at
least once.
+ Once updated, duplicates are bound to have same commit time unless
forcefully modified.
+ */
+ val size = rows.size - 1
+ var i = 0
+ val loop = new Breaks
+ loop.breakable {
Review comment:
Right. Thank you for suggesting this, I am not hands on at scala
properly. :)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]