This is an automated email from the ASF dual-hosted git repository.
uditme pushed a commit to branch release-0.9.0
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/release-0.9.0 by this push:
new d603060 HUDI-1674 (#3488)
d603060 is described below
commit d603060a43032881bb66023ed2d7c07654f5179d
Author: liujinhui <[email protected]>
AuthorDate: Wed Aug 18 13:45:48 2021 +0800
HUDI-1674 (#3488)
---
.../examples/spark/HoodieWriteClientExample.java | 12 +++++-
.../examples/spark/HoodieDataSourceExample.scala | 44 ++++++++++++++++++++--
.../scala/org/apache/hudi/DataSourceOptions.scala | 1 +
.../main/java/org/apache/hudi/QuickstartUtils.java | 2 +
4 files changed, 55 insertions(+), 4 deletions(-)
diff --git
a/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
b/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
index feddd1b..35e4660 100644
---
a/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
+++
b/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
@@ -47,7 +48,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
-
/**
* Simple examples of #{@link SparkRDDWriteClient}.
*
@@ -127,6 +127,16 @@ public class HoodieWriteClientExample {
JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(toBeDeleted, 1);
client.delete(deleteRecords, newCommitTime);
+ // Delete by partition
+ newCommitTime = client.startCommit();
+ client.startCommitWithTime(newCommitTime,
HoodieTimeline.REPLACE_COMMIT_ACTION);
+ LOG.info("Starting commit " + newCommitTime);
+ // The partition where the data needs to be deleted
+ List<String> partitionList = toBeDeleted.stream().map(s ->
s.getPartitionPath()).distinct().collect(Collectors.toList());
+ List<String> deleteList = recordsSoFar.stream().filter(f ->
!partitionList.contains(f.getPartitionPath()))
+ .map(m ->
m.getKey().getPartitionPath()).distinct().collect(Collectors.toList());
+ client.deletePartitions(deleteList, newCommitTime);
+
// compaction
if (HoodieTableType.valueOf(tableType) == HoodieTableType.MERGE_ON_READ)
{
Option<String> instant = client.scheduleCompaction(Option.empty());
diff --git
a/hudi-examples/src/main/scala/org/apache/hudi/examples/spark/HoodieDataSourceExample.scala
b/hudi-examples/src/main/scala/org/apache/hudi/examples/spark/HoodieDataSourceExample.scala
index 3072f6f..ada5aea 100644
---
a/hudi-examples/src/main/scala/org/apache/hudi/examples/spark/HoodieDataSourceExample.scala
+++
b/hudi-examples/src/main/scala/org/apache/hudi/examples/spark/HoodieDataSourceExample.scala
@@ -19,7 +19,7 @@
package org.apache.hudi.examples.spark
import org.apache.hudi.DataSourceReadOptions.{BEGIN_INSTANTTIME,
END_INSTANTTIME, QUERY_TYPE_INCREMENTAL_OPT_VAL, QUERY_TYPE}
-import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD,
PRECOMBINE_FIELD, RECORDKEY_FIELD}
+import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD,
PRECOMBINE_FIELD, RECORDKEY_FIELD, PARTITIONS_TO_DELETE, OPERATION,
DELETE_PARTITION_OPERATION_OPT_VAL, DELETE_OPERATION_OPT_VAL}
import org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs
import org.apache.hudi.common.model.HoodieAvroPayload
import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
@@ -61,10 +61,12 @@ object HoodieDataSourceExample {
incrementalQuery(spark, tablePath, tableName)
pointInTimeQuery(spark, tablePath, tableName)
+ delete(spark, tablePath, tableName)
+ deleteByPartition(spark, tablePath, tableName)
+
spark.stop()
}
-
/**
* Generate some new trips, load them into a DataFrame and write the
DataFrame into the Hudi dataset as below.
*/
@@ -72,7 +74,6 @@ object HoodieDataSourceExample {
val commitTime: String = System.currentTimeMillis().toString
val inserts =
dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20))
- spark.sparkContext.parallelize(inserts, 2)
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 1))
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
@@ -130,6 +131,43 @@ object HoodieDataSourceExample {
}
/**
+ * Deleta data based in data information.
+ */
+ def delete(spark: SparkSession, tablePath: String, tableName: String): Unit
= {
+
+ val roViewDF = spark.read.format("org.apache.hudi").load(tablePath +
"/*/*/*/*")
+ roViewDF.createOrReplaceTempView("hudi_ro_table")
+ val df = spark.sql("select uuid, partitionpath, ts from hudi_ro_table
limit 2")
+
+ df.write.format("org.apache.hudi").
+ options(getQuickstartWriteConfigs).
+ option(PRECOMBINE_FIELD.key, "ts").
+ option(RECORDKEY_FIELD.key, "uuid").
+ option(PARTITIONPATH_FIELD.key, "partitionpath").
+ option(TABLE_NAME.key, tableName).
+ option(OPERATION.key, DELETE_OPERATION_OPT_VAL).
+ mode(Append).
+ save(tablePath)
+ }
+
+ /**
+ * Delete the data of a single or multiple partitions.
+ */
+ def deleteByPartition(spark: SparkSession, tablePath: String, tableName:
String): Unit = {
+ val df = spark.emptyDataFrame
+ df.write.format("org.apache.hudi").
+ options(getQuickstartWriteConfigs).
+ option(PRECOMBINE_FIELD.key, "ts").
+ option(RECORDKEY_FIELD.key, "uuid").
+ option(PARTITIONPATH_FIELD.key, "partitionpath").
+ option(TABLE_NAME.key, tableName).
+ option(OPERATION.key, DELETE_PARTITION_OPERATION_OPT_VAL).
+ option(PARTITIONS_TO_DELETE.key(),
HoodieExampleDataGenerator.DEFAULT_PARTITION_PATHS.mkString(",")).
+ mode(Append).
+ save(tablePath)
+ }
+
+ /**
* Hudi also provides capability to obtain a stream of records that changed
since given commit timestamp.
* This can be achieved using Hudi’s incremental view and providing a begin
time from which changes need to be streamed.
* We do not need to specify endTime, if we want all changes after the
given commit (as is the common case).
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 168e795..5044ab6 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -171,6 +171,7 @@ object DataSourceWriteOptions {
val INSERT_OPERATION_OPT_VAL = WriteOperationType.INSERT.value
val UPSERT_OPERATION_OPT_VAL = WriteOperationType.UPSERT.value
val DELETE_OPERATION_OPT_VAL = WriteOperationType.DELETE.value
+ val DELETE_PARTITION_OPERATION_OPT_VAL =
WriteOperationType.DELETE_PARTITION.value
val BOOTSTRAP_OPERATION_OPT_VAL = WriteOperationType.BOOTSTRAP.value
val INSERT_OVERWRITE_OPERATION_OPT_VAL =
WriteOperationType.INSERT_OVERWRITE.value
val INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL =
WriteOperationType.INSERT_OVERWRITE_TABLE.value
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
index 47b3017..e0929ef 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
+++
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
@@ -241,6 +241,8 @@ public class QuickstartUtils {
Map<String, String> demoConfigs = new HashMap<>();
demoConfigs.put("hoodie.insert.shuffle.parallelism", "2");
demoConfigs.put("hoodie.upsert.shuffle.parallelism", "2");
+ demoConfigs.put("hoodie.bulkinsert.shuffle.parallelism", "2");
+ demoConfigs.put("hoodie.delete.shuffle.parallelism", "2");
return demoConfigs;
}
}
\ No newline at end of file