This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b7da6cb  [HUDI-2307]  When using delete_partition with ds should not 
rely on the primary key (#3469)
b7da6cb is described below

commit b7da6cb33d27002525e40913dd63f077aeba26f0
Author: liujinhui <[email protected]>
AuthorDate: Sat Aug 14 14:53:39 2021 +0800

    [HUDI-2307]  When using delete_partition with ds should not rely on the 
primary key (#3469)
    
    - Co-authored-by: Sivabalan Narayanan <[email protected]>
---
 .../scala/org/apache/hudi/DataSourceOptions.scala  |   5 +
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  11 +-
 .../apache/hudi/HoodieSparkSqlWriterSuite.scala    | 115 +++++++++++----------
 style/scalastyle.xml                               |   2 +-
 4 files changed, 75 insertions(+), 58 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index a534ac5..36c0493 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -325,6 +325,11 @@ object DataSourceWriteOptions {
   @Deprecated
   val INSERT_DROP_DUPS_OPT_KEY = INSERT_DROP_DUPS.key()
 
+  val PARTITIONS_TO_DELETE: ConfigProperty[String] = ConfigProperty
+    .key("hoodie.datasource.write.partitions.to.delete")
+    .noDefaultValue()
+    .withDocumentation("Comma separated list of partitions to delete")
+
   val STREAMING_RETRY_CNT: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.write.streaming.retry.count")
     .defaultValue("3")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index f41df94..2c8d33e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -29,8 +29,8 @@ import org.apache.hudi.client.{HoodieWriteResult, 
SparkRDDWriteClient}
 import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, 
TypedProperties}
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, 
WriteOperationType}
-import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
-import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, 
HoodieTimeline}
+import org.apache.hudi.common.table.TableSchemaResolver
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils}
 import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH, 
BOOTSTRAP_INDEX_CLASS}
@@ -192,7 +192,12 @@ object HoodieSparkSqlWriter {
             }
 
             // Get list of partitions to delete
-            val partitionsToDelete = genericRecords.map(gr => 
keyGenerator.getKey(gr).getPartitionPath).toJavaRDD().distinct().collect()
+            val partitionsToDelete = if 
(parameters.containsKey(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key())) {
+              val partitionColsToDelete = 
parameters.get(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key()).get.split(",")
+              java.util.Arrays.asList(partitionColsToDelete:_*)
+            } else {
+              genericRecords.map(gr => 
keyGenerator.getKey(gr).getPartitionPath).toJavaRDD().distinct().collect()
+            }
             // Create a HoodieWriteClient & issue the delete.
             val client = 
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
               null, path.get, tblName,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala
index e767f8a..1ff30e2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala
@@ -22,7 +22,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.common.config.HoodieConfig
-import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord, 
HoodieRecordPayload, HoodieTableType, WriteOperationType}
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
@@ -679,61 +679,68 @@ class HoodieSparkSqlWriterSuite extends FunSuite with 
Matchers {
     }
   }
 
-  test("test delete partitions") {
-    initSparkContext("test_delete_partitions")
-    val path = 
java.nio.file.Files.createTempDirectory("hoodie_test_path_delete_partitions")
-    try {
-      val hoodieFooTableName = "hoodie_foo_tbl_delete_partitions"
-      val fooTableModifier = getCommonParams(path, hoodieFooTableName, 
HoodieTableType.COPY_ON_WRITE.name())
-      val fooTableParams = 
HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
-      val schema = DataSourceTestUtils.getStructTypeExampleSchema
-      val structType = 
AvroConversionUtils.convertAvroSchemaToStructType(schema)
-      val records = DataSourceTestUtils.generateRandomRows(10)
-      val recordsSeq = convertRowListToSeq(records)
-      val df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
-      // write to Hudi
-      HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, 
fooTableParams, df1)
+  List(true, false)
+    .foreach(usePartitionsToDeleteConfig => {
+      test("test delete partitions for " + usePartitionsToDeleteConfig) {
+        initSparkContext("test_delete_partitions_" + 
usePartitionsToDeleteConfig)
+        val path = 
java.nio.file.Files.createTempDirectory("hoodie_test_path_delete_partitions")
+        try {
+          val hoodieFooTableName = "hoodie_foo_tbl_delete_partitions"
+          val fooTableModifier = getCommonParams(path, hoodieFooTableName, 
HoodieTableType.COPY_ON_WRITE.name())
+          var fooTableParams = 
HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
+          val schema = DataSourceTestUtils.getStructTypeExampleSchema
+          val structType = 
AvroConversionUtils.convertAvroSchemaToStructType(schema)
+          val records = DataSourceTestUtils.generateRandomRows(10)
+          val recordsSeq = convertRowListToSeq(records)
+          val df1 = spark.createDataFrame(sc.parallelize(recordsSeq), 
structType)
+          // write to Hudi
+          HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, 
fooTableParams, df1)
 
-      val snapshotDF1 = spark.read.format("org.apache.hudi")
-        .load(path.toAbsolutePath.toString + "/*/*/*/*")
-      assertEquals(10, snapshotDF1.count())
-      // remove metadata columns so that expected and actual DFs can be 
compared as is
-      val trimmedDf1 = dropMetaFields(snapshotDF1)
-      assert(df1.except(trimmedDf1).count() == 0)
-
-      // issue updates so that log files are created for MOR table
-      var updatesSeq = 
convertRowListToSeq(DataSourceTestUtils.generateUpdates(records, 5))
-      var updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), 
structType)
-      // write updates to Hudi
-      HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, 
updatesDf)
-      val snapshotDF2 = spark.read.format("org.apache.hudi")
-        .load(path.toAbsolutePath.toString + "/*/*/*/*")
-      assertEquals(10, snapshotDF2.count())
+          val snapshotDF1 = spark.read.format("org.apache.hudi")
+            .load(path.toAbsolutePath.toString + "/*/*/*/*")
+          assertEquals(10, snapshotDF1.count())
+          // remove metadata columns so that expected and actual DFs can be 
compared as is
+          val trimmedDf1 = dropMetaFields(snapshotDF1)
+          assert(df1.except(trimmedDf1).count() == 0)
 
-      // remove metadata columns so that expected and actual DFs can be 
compared as is
-      val trimmedDf2 = dropMetaFields(snapshotDF2)
-      // ensure 2nd batch of updates matches.
-      assert(updatesDf.intersect(trimmedDf2).except(updatesDf).count() == 0)
-
-      // delete partitions
-      val recordsToDelete = df1.filter(entry => {
-        val partitionPath : String = entry.getString(1)
-        
partitionPath.equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) || 
partitionPath.equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)
-      })
-      val updatedParams = 
fooTableParams.updated(DataSourceWriteOptions.OPERATION.key(), 
WriteOperationType.DELETE_PARTITION.name())
-      HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, updatedParams, 
recordsToDelete)
-
-      val snapshotDF3 = spark.read.format("org.apache.hudi")
-        .load(path.toAbsolutePath.toString + "/*/*/*/*")
-      assertEquals(0, snapshotDF3.filter(entry => {
-        val partitionPath = entry.getString(3)
-        
!partitionPath.equals(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH)
-      }).count())
-    } finally {
-      spark.stop()
-      FileUtils.deleteDirectory(path.toFile)
-    }
-  }
+          // issue updates so that log files are created for MOR table
+          var updatesSeq = 
convertRowListToSeq(DataSourceTestUtils.generateUpdates(records, 5))
+          var updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), 
structType)
+          // write updates to Hudi
+          HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, 
fooTableParams, updatesDf)
+          val snapshotDF2 = spark.read.format("org.apache.hudi")
+            .load(path.toAbsolutePath.toString + "/*/*/*/*")
+          assertEquals(10, snapshotDF2.count())
+
+          // remove metadata columns so that expected and actual DFs can be 
compared as is
+          val trimmedDf2 = dropMetaFields(snapshotDF2)
+          // ensure 2nd batch of updates matches.
+          assert(updatesDf.intersect(trimmedDf2).except(updatesDf).count() == 
0)
+
+          if ( usePartitionsToDeleteConfig) {
+            
fooTableParams.updated(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key(), 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
+          }
+          // delete partitions contains the primary key
+          val recordsToDelete = df1.filter(entry => {
+            val partitionPath : String = entry.getString(1)
+            
partitionPath.equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) ||
+              
partitionPath.equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)
+          })
+          val updatedParams = 
fooTableParams.updated(DataSourceWriteOptions.OPERATION.key(), 
WriteOperationType.DELETE_PARTITION.name())
+          HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, 
updatedParams, recordsToDelete)
+
+          val snapshotDF3 = spark.read.format("org.apache.hudi")
+            .load(path.toAbsolutePath.toString + "/*/*/*/*")
+          assertEquals(0, snapshotDF3.filter(entry => {
+            val partitionPath = entry.getString(3)
+            
!partitionPath.equals(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH)
+          }).count())
+        } finally {
+          spark.stop()
+          FileUtils.deleteDirectory(path.toFile)
+        }
+      }
+    })
 
   def dropMetaFields(df: Dataset[Row]) : Dataset[Row] = {
     
df.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
diff --git a/style/scalastyle.xml b/style/scalastyle.xml
index fa2c4d3..89306f3 100644
--- a/style/scalastyle.xml
+++ b/style/scalastyle.xml
@@ -27,7 +27,7 @@
  <check level="error" class="org.scalastyle.file.FileTabChecker" 
enabled="true"/>
  <check level="error" class="org.scalastyle.file.FileLengthChecker" 
enabled="true">
   <parameters>
-   <parameter name="maxFileLength"><![CDATA[800]]></parameter>
+   <parameter name="maxFileLength"><![CDATA[900]]></parameter>
   </parameters>
  </check>
  <check level="error" 
class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"/>

Reply via email to