This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 96892b31e1 [HUDI-4577] Adding test coverage for `DELETE FROM`, Spark
Quickstart guide (#6318)
96892b31e1 is described below
commit 96892b31e18a145e90cb842df804fdeacc498ab7
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Tue Aug 9 10:51:53 2022 -0700
[HUDI-4577] Adding test coverage for `DELETE FROM`, Spark Quickstart guide
(#6318)
---
.../examples/quickstart/HoodieSparkQuickstart.java | 55 +++++++++++---
.../quickstart/TestHoodieSparkQuickstart.java | 30 ++++----
.../spark/sql/hudi/TestDeleteFromTable.scala | 83 ++++++++++++++++++++++
hudi-utilities/src/test/resources/hive-site.xml | 35 +++++++++
4 files changed, 179 insertions(+), 24 deletions(-)
diff --git
a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java
b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java
index 9f8e29d687..5a6db78f88 100644
---
a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java
+++
b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java
@@ -55,18 +55,33 @@ public final class HoodieSparkQuickstart {
SparkConf sparkConf =
HoodieExampleSparkUtils.defaultSparkConf("hoodie-client-example");
try (JavaSparkContext jsc = new JavaSparkContext(sparkConf)) {
- final HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new
HoodieExampleDataGenerator<>();
+ runQuickstart(jsc, spark, tableName, tablePath);
+ }
+ }
- insertData(spark, jsc, tablePath, tableName, dataGen);
- updateData(spark, jsc, tablePath, tableName, dataGen);
- queryData(spark, jsc, tablePath, tableName, dataGen);
+ /**
+ * Visible for testing
+ */
+ public static void runQuickstart(JavaSparkContext jsc, SparkSession spark,
String tableName, String tablePath) {
+ final HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new
HoodieExampleDataGenerator<>();
- incrementalQuery(spark, tablePath, tableName);
- pointInTimeQuery(spark, tablePath, tableName);
+ insertData(spark, jsc, tablePath, tableName, dataGen);
+ queryData(spark, jsc, tablePath, tableName, dataGen);
- delete(spark, tablePath, tableName);
- deleteByPartition(spark, tablePath, tableName);
- }
+ updateData(spark, jsc, tablePath, tableName, dataGen);
+ queryData(spark, jsc, tablePath, tableName, dataGen);
+
+ incrementalQuery(spark, tablePath, tableName);
+ pointInTimeQuery(spark, tablePath, tableName);
+
+ delete(spark, tablePath, tableName);
+ queryData(spark, jsc, tablePath, tableName, dataGen);
+
+ insertOverwriteData(spark, jsc, tablePath, tableName, dataGen);
+ queryData(spark, jsc, tablePath, tableName, dataGen);
+
+ deleteByPartition(spark, tablePath, tableName);
+ queryData(spark, jsc, tablePath, tableName, dataGen);
}
/**
@@ -77,6 +92,7 @@ public final class HoodieSparkQuickstart {
String commitTime = Long.toString(System.currentTimeMillis());
List<String> inserts =
dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20));
Dataset<Row> df = spark.read().json(jsc.parallelize(inserts, 1));
+
df.write().format("org.apache.hudi")
.options(QuickstartUtils.getQuickstartWriteConfigs())
.option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts")
@@ -87,6 +103,27 @@ public final class HoodieSparkQuickstart {
.save(tablePath);
}
+ /**
+ * Generate new records, load them into a {@link Dataset} and
insert-overwrite it into the Hudi dataset
+ */
+ public static void insertOverwriteData(SparkSession spark, JavaSparkContext
jsc, String tablePath, String tableName,
+ HoodieExampleDataGenerator<HoodieAvroPayload>
dataGen) {
+ String commitTime = Long.toString(System.currentTimeMillis());
+ List<String> inserts =
dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20));
+ Dataset<Row> df = spark.read().json(jsc.parallelize(inserts, 1));
+
+ df.write().format("org.apache.hudi")
+ .options(QuickstartUtils.getQuickstartWriteConfigs())
+ .option("hoodie.datasource.write.operation",
WriteOperationType.INSERT_OVERWRITE.name())
+ .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts")
+ .option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid")
+ .option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),
"partitionpath")
+ .option(TBL_NAME.key(), tableName)
+ .mode(Append)
+ .save(tablePath);
+ }
+
+
/**
* Load the data files into a DataFrame.
*/
diff --git
a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java
b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java
index 212dcc4409..b9ab120460 100644
---
a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java
+++
b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java
@@ -36,12 +36,22 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.nio.file.Paths;
+import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.delete;
+import static
org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.deleteByPartition;
+import static
org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.incrementalQuery;
+import static
org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.insertData;
+import static
org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.insertOverwriteData;
+import static
org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.pointInTimeQuery;
+import static
org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.queryData;
+import static
org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.runQuickstart;
+import static
org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.updateData;
+
public class TestHoodieSparkQuickstart implements SparkProvider {
- protected static transient HoodieSparkEngineContext context;
+ protected static HoodieSparkEngineContext context;
- private static transient SparkSession spark;
- private static transient SQLContext sqlContext;
- private static transient JavaSparkContext jsc;
+ private static SparkSession spark;
+ private static SQLContext sqlContext;
+ private static JavaSparkContext jsc;
/**
* An indicator of the initialization status.
@@ -50,8 +60,6 @@ public class TestHoodieSparkQuickstart implements
SparkProvider {
@TempDir
protected java.nio.file.Path tempDir;
- private static final HoodieExampleDataGenerator<HoodieAvroPayload> DATA_GEN
= new HoodieExampleDataGenerator<>();
-
@Override
public SparkSession spark() {
return spark;
@@ -100,15 +108,7 @@ public class TestHoodieSparkQuickstart implements
SparkProvider {
String tablePath = tablePath(tableName);
try {
- HoodieSparkQuickstart.insertData(spark, jsc, tablePath, tableName,
DATA_GEN);
- HoodieSparkQuickstart.updateData(spark, jsc, tablePath, tableName,
DATA_GEN);
-
- HoodieSparkQuickstart.queryData(spark, jsc, tablePath, tableName,
DATA_GEN);
- HoodieSparkQuickstart.incrementalQuery(spark, tablePath, tableName);
- HoodieSparkQuickstart.pointInTimeQuery(spark, tablePath, tableName);
-
- HoodieSparkQuickstart.delete(spark, tablePath, tableName);
- HoodieSparkQuickstart.deleteByPartition(spark, tablePath, tableName);
+ runQuickstart(jsc, spark, tableName, tablePath);
} finally {
Utils.deleteRecursively(new File(tablePath));
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteFromTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteFromTable.scala
new file mode 100644
index 0000000000..a972f835e8
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteFromTable.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+class TestDeleteFromTable extends HoodieSparkSqlTestBase {
+
+ test("Test deleting from table") {
+ withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |CREATE TABLE $tableName (
+ | id int,
+ | dt string,
+ | name string,
+ | price double,
+ | ts long
+ |) USING hudi
+ | tblproperties (
+ | primaryKey = 'id',
+ | tableType = '$tableType'
+ | )
+ | PARTITIONED BY (dt)
+ | LOCATION '${tmp.getCanonicalPath}'
+ """.stripMargin)
+
+ // NOTE: Do not write the field alias, the partition field must be
placed last.
+ spark.sql(
+ s"""
+ | INSERT INTO $tableName VALUES
+ | (1, 'a1', 10, 1000, "2021-01-05"),
+ | (2, 'a2', 20, 2000, "2021-01-06"),
+ | (3, 'a3', 30, 3000, "2021-01-07")
+ """.stripMargin)
+
+ checkAnswer(s"SELECT id, name, price, ts, dt FROM $tableName")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+ Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+ Seq(3, "a3", 30.0, 3000, "2021-01-07")
+ )
+
+ // Delete single row
+ spark.sql(s"DELETE FROM $tableName WHERE id = 1")
+
+ checkAnswer(s"SELECT id, name, price, ts, dt FROM $tableName")(
+ Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+ Seq(3, "a3", 30.0, 3000, "2021-01-07")
+ )
+
+ // Try deleting non-existent row
+ spark.sql(s"DELETE FROM $tableName WHERE id = 1")
+
+ checkAnswer(s"SELECT id, name, price, ts, dt FROM $tableName")(
+ Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+ Seq(3, "a3", 30.0, 3000, "2021-01-07")
+ )
+
+ // Delete record identified by some field other than the primary-key
+ spark.sql(s"DELETE FROM $tableName WHERE name = 'a2'")
+
+ checkAnswer(s"SELECT id, name, price, ts, dt FROM $tableName")(
+ Seq(3, "a3", 30.0, 3000, "2021-01-07")
+ )
+ }
+ }
+ }
+}
diff --git a/hudi-utilities/src/test/resources/hive-site.xml
b/hudi-utilities/src/test/resources/hive-site.xml
new file mode 100644
index 0000000000..4866230d27
--- /dev/null
+++ b/hudi-utilities/src/test/resources/hive-site.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<configuration>
+ <property>
+ <name>hive.server2.enable.doAs</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>hive.metastore.schema.verification</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>datanucleus.schema.autoCreateTables</name>
+ <value>true</value>
+ </property>
+</configuration>