This is an automated email from the ASF dual-hosted git repository.

uditme pushed a commit to branch release-0.9.0
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/release-0.9.0 by this push:
     new d603060  HUDI-1674 (#3488)
d603060 is described below

commit d603060a43032881bb66023ed2d7c07654f5179d
Author: liujinhui <[email protected]>
AuthorDate: Wed Aug 18 13:45:48 2021 +0800

    HUDI-1674 (#3488)
---
 .../examples/spark/HoodieWriteClientExample.java   | 12 +++++-
 .../examples/spark/HoodieDataSourceExample.scala   | 44 ++++++++++++++++++++--
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  1 +
 .../main/java/org/apache/hudi/QuickstartUtils.java |  2 +
 4 files changed, 55 insertions(+), 4 deletions(-)

diff --git 
a/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
 
b/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
index feddd1b..35e4660 100644
--- 
a/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
+++ 
b/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
@@ -47,7 +48,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
-
 /**
  * Simple examples of #{@link SparkRDDWriteClient}.
  *
@@ -127,6 +127,16 @@ public class HoodieWriteClientExample {
       JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(toBeDeleted, 1);
       client.delete(deleteRecords, newCommitTime);
 
+      // Delete by partition
+      newCommitTime = client.startCommit();
+      client.startCommitWithTime(newCommitTime, 
HoodieTimeline.REPLACE_COMMIT_ACTION);
+      LOG.info("Starting commit " + newCommitTime);
+      // The partition where the data needs to be deleted
+      List<String> partitionList = toBeDeleted.stream().map(s -> 
s.getPartitionPath()).distinct().collect(Collectors.toList());
+      List<String> deleteList = recordsSoFar.stream().filter(f -> 
!partitionList.contains(f.getPartitionPath()))
+          .map(m -> 
m.getKey().getPartitionPath()).distinct().collect(Collectors.toList());
+      client.deletePartitions(deleteList, newCommitTime);
+
       // compaction
       if (HoodieTableType.valueOf(tableType) == HoodieTableType.MERGE_ON_READ) 
{
         Option<String> instant = client.scheduleCompaction(Option.empty());
diff --git 
a/hudi-examples/src/main/scala/org/apache/hudi/examples/spark/HoodieDataSourceExample.scala
 
b/hudi-examples/src/main/scala/org/apache/hudi/examples/spark/HoodieDataSourceExample.scala
index 3072f6f..ada5aea 100644
--- 
a/hudi-examples/src/main/scala/org/apache/hudi/examples/spark/HoodieDataSourceExample.scala
+++ 
b/hudi-examples/src/main/scala/org/apache/hudi/examples/spark/HoodieDataSourceExample.scala
@@ -19,7 +19,7 @@
 package org.apache.hudi.examples.spark
 
 import org.apache.hudi.DataSourceReadOptions.{BEGIN_INSTANTTIME, 
END_INSTANTTIME, QUERY_TYPE_INCREMENTAL_OPT_VAL, QUERY_TYPE}
-import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, 
PRECOMBINE_FIELD, RECORDKEY_FIELD}
+import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, 
PRECOMBINE_FIELD, RECORDKEY_FIELD, PARTITIONS_TO_DELETE, OPERATION, 
DELETE_PARTITION_OPERATION_OPT_VAL, DELETE_OPERATION_OPT_VAL}
 import org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs
 import org.apache.hudi.common.model.HoodieAvroPayload
 import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
@@ -61,10 +61,12 @@ object HoodieDataSourceExample {
     incrementalQuery(spark, tablePath, tableName)
     pointInTimeQuery(spark, tablePath, tableName)
 
+    delete(spark, tablePath, tableName)
+    deleteByPartition(spark, tablePath, tableName)
+
     spark.stop()
   }
 
-
   /**
     * Generate some new trips, load them into a DataFrame and write the 
DataFrame into the Hudi dataset as below.
     */
@@ -72,7 +74,6 @@ object HoodieDataSourceExample {
 
     val commitTime: String = System.currentTimeMillis().toString
     val inserts = 
dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20))
-    spark.sparkContext.parallelize(inserts, 2)
     val df = spark.read.json(spark.sparkContext.parallelize(inserts, 1))
     df.write.format("org.apache.hudi").
         options(getQuickstartWriteConfigs).
@@ -130,6 +131,43 @@ object HoodieDataSourceExample {
   }
 
   /**
+   * Deleta data based in data information.
+   */
+  def delete(spark: SparkSession, tablePath: String, tableName: String): Unit 
= {
+
+    val roViewDF = spark.read.format("org.apache.hudi").load(tablePath + 
"/*/*/*/*")
+    roViewDF.createOrReplaceTempView("hudi_ro_table")
+    val df = spark.sql("select uuid, partitionpath, ts from  hudi_ro_table 
limit 2")
+
+    df.write.format("org.apache.hudi").
+      options(getQuickstartWriteConfigs).
+      option(PRECOMBINE_FIELD.key, "ts").
+      option(RECORDKEY_FIELD.key, "uuid").
+      option(PARTITIONPATH_FIELD.key, "partitionpath").
+      option(TABLE_NAME.key, tableName).
+      option(OPERATION.key, DELETE_OPERATION_OPT_VAL).
+      mode(Append).
+      save(tablePath)
+  }
+
+  /**
+   *  Delete the data of a single or multiple partitions.
+   */
+  def deleteByPartition(spark: SparkSession, tablePath: String, tableName: 
String): Unit = {
+    val df = spark.emptyDataFrame
+    df.write.format("org.apache.hudi").
+      options(getQuickstartWriteConfigs).
+      option(PRECOMBINE_FIELD.key, "ts").
+      option(RECORDKEY_FIELD.key, "uuid").
+      option(PARTITIONPATH_FIELD.key, "partitionpath").
+      option(TABLE_NAME.key, tableName).
+      option(OPERATION.key, DELETE_PARTITION_OPERATION_OPT_VAL).
+      option(PARTITIONS_TO_DELETE.key(), 
HoodieExampleDataGenerator.DEFAULT_PARTITION_PATHS.mkString(",")).
+      mode(Append).
+      save(tablePath)
+  }
+
+  /**
     * Hudi also provides capability to obtain a stream of records that changed 
since given commit timestamp.
     * This can be achieved using Hudi’s incremental view and providing a begin 
time from which changes need to be streamed.
     * We do not need to specify endTime, if we want all changes after the 
given commit (as is the common case).
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 168e795..5044ab6 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -171,6 +171,7 @@ object DataSourceWriteOptions {
   val INSERT_OPERATION_OPT_VAL = WriteOperationType.INSERT.value
   val UPSERT_OPERATION_OPT_VAL = WriteOperationType.UPSERT.value
   val DELETE_OPERATION_OPT_VAL = WriteOperationType.DELETE.value
+  val DELETE_PARTITION_OPERATION_OPT_VAL = 
WriteOperationType.DELETE_PARTITION.value
   val BOOTSTRAP_OPERATION_OPT_VAL = WriteOperationType.BOOTSTRAP.value
   val INSERT_OVERWRITE_OPERATION_OPT_VAL = 
WriteOperationType.INSERT_OVERWRITE.value
   val INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL = 
WriteOperationType.INSERT_OVERWRITE_TABLE.value
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
index 47b3017..e0929ef 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
@@ -241,6 +241,8 @@ public class QuickstartUtils {
     Map<String, String> demoConfigs = new HashMap<>();
     demoConfigs.put("hoodie.insert.shuffle.parallelism", "2");
     demoConfigs.put("hoodie.upsert.shuffle.parallelism", "2");
+    demoConfigs.put("hoodie.bulkinsert.shuffle.parallelism", "2");
+    demoConfigs.put("hoodie.delete.shuffle.parallelism", "2");
     return demoConfigs;
   }
 }
\ No newline at end of file

Reply via email to