yanghua commented on a change in pull request #1558:
URL: https://github.com/apache/hudi/pull/1558#discussion_r447758748



##########
File path: hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala
##########
@@ -98,34 +97,92 @@ class DedupeSparkJob(basePath: String,
         ON h.`_hoodie_record_key` = d.dupe_key
                       """
     val dupeMap = sqlContext.sql(dupeDataSql).collectAsList().groupBy(r => 
r.getString(0))
-    val fileToDeleteKeyMap = new HashMap[String, HashSet[String]]()
+    getDedupePlan(dupeMap)
+  }
 
-    // Mark all files except the one with latest commits for deletion
+  private def getDedupePlan(dupeMap: Map[String, Buffer[Row]]): 
HashMap[String, HashSet[String]] = {
+    val fileToDeleteKeyMap = new HashMap[String, HashSet[String]]()
     dupeMap.foreach(rt => {
       val (key, rows) = rt
-      var maxCommit = -1L
-
-      rows.foreach(r => {
-        val c = r(3).asInstanceOf[String].toLong
-        if (c > maxCommit)
-          maxCommit = c
-      })
-
-      rows.foreach(r => {
-        val c = r(3).asInstanceOf[String].toLong
-        if (c != maxCommit) {
-          val f = r(2).asInstanceOf[String].split("_")(0)
-          if (!fileToDeleteKeyMap.contains(f)) {
-            fileToDeleteKeyMap(f) = HashSet[String]()
-          }
-          fileToDeleteKeyMap(f).add(key)
-        }
-      })
+
+      dedupeType match {
+        case DeDupeType.UPDATE_TYPE =>
+          /*
+          This corresponds to the case where all duplicates have been updated 
at least once.
+          Once updated, duplicates are bound to have same commit time unless 
forcefully modified.
+          */
+          rows.init.foreach(r => {
+            val f = r(2).asInstanceOf[String].split("_")(0)
+            if (!fileToDeleteKeyMap.contains(f)) {
+              fileToDeleteKeyMap(f) = HashSet[String]()
+            }
+            fileToDeleteKeyMap(f).add(key)
+          })
+        case DeDupeType.INSERT_TYPE =>
+          /*
+          This corresponds to the case where duplicates got created due to 
INSERT and have never been updated.
+          */
+          var maxCommit = -1L
+
+          rows.foreach(r => {
+            val c = r(3).asInstanceOf[String].toLong
+            if (c > maxCommit)
+              maxCommit = c
+          })
+          rows.foreach(r => {
+            val c = r(3).asInstanceOf[String].toLong
+            if (c != maxCommit) {
+              val f = r(2).asInstanceOf[String].split("_")(0)
+              if (!fileToDeleteKeyMap.contains(f)) {
+                fileToDeleteKeyMap(f) = HashSet[String]()
+              }
+              fileToDeleteKeyMap(f).add(key)
+            }
+          })
+
+        case DeDupeType.UPSERT_TYPE =>
+          /*
+          This corresponds to the case where duplicates got created as a 
result of inserts as well as updates,
+          i.e few duplicate records have been updated, while others were never 
updated.
+           */
+          var maxCommit = -1L
+
+          rows.foreach(r => {
+            val c = r(3).asInstanceOf[String].toLong
+            if (c > maxCommit)
+              maxCommit = c
+          })
+          val rowsWithMaxCommit = new ListBuffer[Row]()
+          rows.foreach(r => {
+            val c = r(3).asInstanceOf[String].toLong
+            if (c != maxCommit) {
+              val f = r(2).asInstanceOf[String].split("_")(0)
+              if (!fileToDeleteKeyMap.contains(f)) {
+                fileToDeleteKeyMap(f) = HashSet[String]()
+              }
+              fileToDeleteKeyMap(f).add(key)
+            } else {
+              rowsWithMaxCommit += r
+            }
+          })
+
+          rowsWithMaxCommit.toList.init.foreach(r => {
+            val f = r(2).asInstanceOf[String].split("_")(0)
+            if (!fileToDeleteKeyMap.contains(f)) {
+              fileToDeleteKeyMap(f) = HashSet[String]()
+            }
+            fileToDeleteKeyMap(f).add(key)
+          })
+
+        case _ => throw new IllegalArgumentException("Please provide valid 
type for deduping!")
+      }
     })
+    LOG.debug(s"fileToDeleteKeyMap size: ${fileToDeleteKeyMap.size}, map: 
$fileToDeleteKeyMap")
     fileToDeleteKeyMap
   }
 
 
+

Review comment:
       why do you add new line here?

##########
File path: 
hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
##########
@@ -77,16 +77,21 @@ public String deduplicate(
           help = "Spark executor memory") final String sparkMemory,
       @CliOption(key = {"dryrun"},
           help = "Should we actually remove duplicates or just run and store 
result to repairedOutputPath",
-          unspecifiedDefaultValue = "true") final boolean dryRun)
+          unspecifiedDefaultValue = "true") final boolean dryRun,
+      @CliOption(key = {"dedupeType"}, help = "Valid values are - insert_type, 
update_type and upsert_type",
+          unspecifiedDefaultValue = "insert_type") final String dedupeType)
       throws Exception {
+    if (!dedupeType.equals("insert_type") && !dedupeType.equals("update_type") 
&& !dedupeType.equals("upsert_type")) {

Review comment:
       Can we get the representation of the dedupeType by the DeDupeType enum?

##########
File path: 
hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
##########
@@ -145,6 +166,60 @@ public void testDeduplicate() throws IOException {
     assertEquals(200, result.count());
   }
 
+  @Test
+  public void testDeduplicateWithUpdates() throws IOException {

Review comment:
       Can we rename the exists `testDeduplicate ` to 
`testDeduplicateWithDefault` or `testDeduplicateWithInserts`?

##########
File path: hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala
##########
@@ -98,34 +97,92 @@ class DedupeSparkJob(basePath: String,
         ON h.`_hoodie_record_key` = d.dupe_key
                       """
     val dupeMap = sqlContext.sql(dupeDataSql).collectAsList().groupBy(r => 
r.getString(0))
-    val fileToDeleteKeyMap = new HashMap[String, HashSet[String]]()
+    getDedupePlan(dupeMap)
+  }
 
-    // Mark all files except the one with latest commits for deletion
+  private def getDedupePlan(dupeMap: Map[String, Buffer[Row]]): 
HashMap[String, HashSet[String]] = {
+    val fileToDeleteKeyMap = new HashMap[String, HashSet[String]]()
     dupeMap.foreach(rt => {
       val (key, rows) = rt
-      var maxCommit = -1L
-
-      rows.foreach(r => {
-        val c = r(3).asInstanceOf[String].toLong
-        if (c > maxCommit)
-          maxCommit = c
-      })
-
-      rows.foreach(r => {
-        val c = r(3).asInstanceOf[String].toLong
-        if (c != maxCommit) {
-          val f = r(2).asInstanceOf[String].split("_")(0)
-          if (!fileToDeleteKeyMap.contains(f)) {
-            fileToDeleteKeyMap(f) = HashSet[String]()
-          }
-          fileToDeleteKeyMap(f).add(key)
-        }
-      })
+
+      dedupeType match {
+        case DeDupeType.UPDATE_TYPE =>
+          /*
+          This corresponds to the case where all duplicates have been updated 
at least once.
+          Once updated, duplicates are bound to have same commit time unless 
forcefully modified.
+          */
+          rows.init.foreach(r => {
+            val f = r(2).asInstanceOf[String].split("_")(0)
+            if (!fileToDeleteKeyMap.contains(f)) {
+              fileToDeleteKeyMap(f) = HashSet[String]()
+            }
+            fileToDeleteKeyMap(f).add(key)
+          })

Review comment:
       split via a new empty line?

##########
File path: hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
##########
@@ -75,8 +76,8 @@ public static void main(String[] args) throws Exception {
         returnCode = rollback(jsc, args[3], args[4]);
         break;
       case DEDUPLICATE:
-        assert (args.length == 7);
-        returnCode = deduplicatePartitionPath(jsc, args[3], args[4], args[5], 
args[6]);
+        assert (args.length == 8);
+        returnCode = deduplicatePartitionPath(jsc, args[3], args[4], args[5], 
Boolean.parseBoolean(args[7]), args[6]);

Review comment:
       ditto, move to the last of the method's parameter list?

##########
File path: 
hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
##########
@@ -77,16 +77,21 @@ public String deduplicate(
           help = "Spark executor memory") final String sparkMemory,
       @CliOption(key = {"dryrun"},
           help = "Should we actually remove duplicates or just run and store 
result to repairedOutputPath",
-          unspecifiedDefaultValue = "true") final boolean dryRun)
+          unspecifiedDefaultValue = "true") final boolean dryRun,
+      @CliOption(key = {"dedupeType"}, help = "Valid values are - insert_type, 
update_type and upsert_type",
+          unspecifiedDefaultValue = "insert_type") final String dedupeType)
       throws Exception {
+    if (!dedupeType.equals("insert_type") && !dedupeType.equals("update_type") 
&& !dedupeType.equals("upsert_type")) {
+      throw new IllegalArgumentException("Please provide valid dedupe type!");
+    }
     if (StringUtils.isNullOrEmpty(sparkPropertiesPath)) {
       sparkPropertiesPath =
           
Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
     }
 
     SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
     sparkLauncher.addAppArgs(SparkMain.SparkCommand.DEDUPLICATE.toString(), 
master, sparkMemory,
-        duplicatedPartitionPath, repairedOutputPath, 
HoodieCLI.getTableMetaClient().getBasePath(),
+        duplicatedPartitionPath, repairedOutputPath, 
HoodieCLI.getTableMetaClient().getBasePath(), dedupeType,

Review comment:
       Can we append this parameter into the list of the parameters?

##########
File path: hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala
##########
@@ -152,7 +209,7 @@ class DedupeSparkJob(basePath: String,
       val newFilePath = new 
Path(s"$repairOutputPath/${fileNameToPathMap(fileName).getName}")
       LOG.info(" Skipping and writing new file for : " + fileName)
       SparkHelpers.skipKeysAndWriteNewFile(instantTime, fs, badFilePath, 
newFilePath, dupeFixPlan(fileName))
-      fs.delete(badFilePath, false)
+      fs.delete(badFilePath, true)

Review comment:
       Can you explain this change?

##########
File path: hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala
##########
@@ -98,34 +97,92 @@ class DedupeSparkJob(basePath: String,
         ON h.`_hoodie_record_key` = d.dupe_key
                       """
     val dupeMap = sqlContext.sql(dupeDataSql).collectAsList().groupBy(r => 
r.getString(0))
-    val fileToDeleteKeyMap = new HashMap[String, HashSet[String]]()
+    getDedupePlan(dupeMap)
+  }
 
-    // Mark all files except the one with latest commits for deletion
+  private def getDedupePlan(dupeMap: Map[String, Buffer[Row]]): 
HashMap[String, HashSet[String]] = {
+    val fileToDeleteKeyMap = new HashMap[String, HashSet[String]]()
     dupeMap.foreach(rt => {
       val (key, rows) = rt
-      var maxCommit = -1L
-
-      rows.foreach(r => {
-        val c = r(3).asInstanceOf[String].toLong
-        if (c > maxCommit)
-          maxCommit = c
-      })
-
-      rows.foreach(r => {
-        val c = r(3).asInstanceOf[String].toLong
-        if (c != maxCommit) {
-          val f = r(2).asInstanceOf[String].split("_")(0)
-          if (!fileToDeleteKeyMap.contains(f)) {
-            fileToDeleteKeyMap(f) = HashSet[String]()
-          }
-          fileToDeleteKeyMap(f).add(key)
-        }
-      })
+
+      dedupeType match {
+        case DeDupeType.UPDATE_TYPE =>
+          /*
+          This corresponds to the case where all duplicates have been updated 
at least once.
+          Once updated, duplicates are bound to have same commit time unless 
forcefully modified.
+          */
+          rows.init.foreach(r => {
+            val f = r(2).asInstanceOf[String].split("_")(0)
+            if (!fileToDeleteKeyMap.contains(f)) {
+              fileToDeleteKeyMap(f) = HashSet[String]()
+            }
+            fileToDeleteKeyMap(f).add(key)
+          })
+        case DeDupeType.INSERT_TYPE =>
+          /*
+          This corresponds to the case where duplicates got created due to 
INSERT and have never been updated.
+          */
+          var maxCommit = -1L
+
+          rows.foreach(r => {
+            val c = r(3).asInstanceOf[String].toLong
+            if (c > maxCommit)
+              maxCommit = c
+          })
+          rows.foreach(r => {

Review comment:
       split via empty line




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to