pratyakshsharma commented on a change in pull request #1558:
URL: https://github.com/apache/incubator-hudi/pull/1558#discussion_r429177537



##########
File path: hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala
##########
@@ -98,34 +97,92 @@ class DedupeSparkJob(basePath: String,
         ON h.`_hoodie_record_key` = d.dupe_key
                       """
     val dupeMap = sqlContext.sql(dupeDataSql).collectAsList().groupBy(r => 
r.getString(0))
-    val fileToDeleteKeyMap = new HashMap[String, HashSet[String]]()
+    getDedupePlan(dupeMap)
+  }
 
-    // Mark all files except the one with latest commits for deletion
+  private def getDedupePlan(dupeMap: Map[String, Buffer[Row]]): 
HashMap[String, HashSet[String]] = {
+    val fileToDeleteKeyMap = new HashMap[String, HashSet[String]]()
     dupeMap.foreach(rt => {
       val (key, rows) = rt
-      var maxCommit = -1L
-
-      rows.foreach(r => {
-        val c = r(3).asInstanceOf[String].toLong
-        if (c > maxCommit)
-          maxCommit = c
-      })
-
-      rows.foreach(r => {
-        val c = r(3).asInstanceOf[String].toLong
-        if (c != maxCommit) {
-          val f = r(2).asInstanceOf[String].split("_")(0)
-          if (!fileToDeleteKeyMap.contains(f)) {
-            fileToDeleteKeyMap(f) = HashSet[String]()
-          }
-          fileToDeleteKeyMap(f).add(key)
-        }
-      })
+
+      dedupeType match {
+        case DeDupeType.updateType =>
+          /*
+          This corresponds to the case where all duplicates have been updated 
at least once.
+          Once updated, duplicates are bound to have same commit time unless 
forcefully modified.
+          */
+          rows.init.foreach(r => {
+            val f = r(2).asInstanceOf[String].split("_")(0)
+            if (!fileToDeleteKeyMap.contains(f)) {
+              fileToDeleteKeyMap(f) = HashSet[String]()
+            }
+            fileToDeleteKeyMap(f).add(key)
+          })
+        case DeDupeType.insertType =>
+          /*
+          This corresponds to the case where duplicates got created due to 
INSERT and have never been updated.
+          */
+          var maxCommit = -1L
+
+          rows.foreach(r => {
+            val c = r(3).asInstanceOf[String].toLong
+            if (c > maxCommit)
+              maxCommit = c
+          })
+          rows.foreach(r => {
+            val c = r(3).asInstanceOf[String].toLong
+            if (c != maxCommit) {
+              val f = r(2).asInstanceOf[String].split("_")(0)
+              if (!fileToDeleteKeyMap.contains(f)) {
+                fileToDeleteKeyMap(f) = HashSet[String]()
+              }
+              fileToDeleteKeyMap(f).add(key)
+            }
+          })
+
+        case DeDupeType.upsertType =>
+          /*
+          This corresponds to the case where duplicates got created as a 
result of inserts as well as updates,
+          i.e few duplicate records have been updated, while others were never 
updated.
+           */
+          var maxCommit = -1L
+
+          rows.foreach(r => {
+            val c = r(3).asInstanceOf[String].toLong
+            if (c > maxCommit)
+              maxCommit = c
+          })
+          val rowsWithMaxCommit = new ListBuffer[Row]()
+          rows.foreach(r => {
+            val c = r(3).asInstanceOf[String].toLong
+            if (c != maxCommit) {
+              val f = r(2).asInstanceOf[String].split("_")(0)
+              if (!fileToDeleteKeyMap.contains(f)) {
+                fileToDeleteKeyMap(f) = HashSet[String]()
+              }
+              fileToDeleteKeyMap(f).add(key)
+            } else {
+              rowsWithMaxCommit += r
+            }
+          })
+
+          rowsWithMaxCommit.toList.init.foreach(r => {
+            val f = r(2).asInstanceOf[String].split("_")(0)
+            if (!fileToDeleteKeyMap.contains(f)) {
+              fileToDeleteKeyMap(f) = HashSet[String]()
+            }
+            fileToDeleteKeyMap(f).add(key)
+          })
+
+        case _ => throw new IllegalArgumentException("Please provide valid 
type for deduping!")
+      }
     })
+    LOG.debug("fileToDeleteKeyMap size : " + fileToDeleteKeyMap.size + ", map: 
" + fileToDeleteKeyMap)

Review comment:
       Done. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to