This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 84b05c8473 [HUDI-4758] Add validations to java spark examples (#6615)
84b05c8473 is described below
commit 84b05c8473aec8ded4fcb2a45f87dd32284b7229
Author: Jon Vexler <[email protected]>
AuthorDate: Wed Sep 21 10:52:08 2022 -0400
[HUDI-4758] Add validations to java spark examples (#6615)
---
.../common/HoodieExampleDataGenerator.java | 59 ++++++++++++++++---
.../examples/quickstart/HoodieSparkQuickstart.java | 67 +++++++++++++++-------
2 files changed, 96 insertions(+), 30 deletions(-)
diff --git
a/hudi-examples/hudi-examples-common/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java
b/hudi-examples/hudi-examples-common/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java
index 4ce11acfa0..004271a329 100644
---
a/hudi-examples/hudi-examples-common/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java
+++
b/hudi-examples/hudi-examples-common/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java
@@ -55,11 +55,11 @@ public class HoodieExampleDataGenerator<T extends
HoodieRecordPayload<T>> {
public static final String[] DEFAULT_PARTITION_PATHS =
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH,
DEFAULT_THIRD_PARTITION_PATH};
public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\":
\"triprec\",\"fields\": [ "
- + "{\"name\": \"ts\",\"type\": \"long\"},{\"name\": \"uuid\",
\"type\": \"string\"},"
- + "{\"name\": \"rider\", \"type\": \"string\"},{\"name\":
\"driver\", \"type\": \"string\"},"
- + "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\":
\"begin_lon\", \"type\": \"double\"},"
- + "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\":
\"end_lon\", \"type\": \"double\"},"
- + "{\"name\":\"fare\",\"type\": \"double\"}]}";
+ + "{\"name\": \"ts\",\"type\": \"long\"},{\"name\": \"uuid\", \"type\":
\"string\"},"
+ + "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\",
\"type\": \"string\"},"
+ + "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\":
\"begin_lon\", \"type\": \"double\"},"
+ + "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\",
\"type\": \"double\"},"
+ + "{\"name\":\"fare\",\"type\": \"double\"}]}";
public static Schema avroSchema = new
Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
private static final Random RAND = new Random(46474747);
@@ -130,12 +130,36 @@ public class HoodieExampleDataGenerator<T extends
HoodieRecordPayload<T>> {
});
}
+ /**
+ * Generates new inserts, across a single partition path. It also updates
the list of existing keys.
+ */
+ public List<HoodieRecord<T>> generateInsertsOnPartition(String commitTime,
Integer n, String partitionPath) {
+ return generateInsertsStreamOnPartition(commitTime, n,
partitionPath).collect(Collectors.toList());
+ }
+
+ /**
+ * Generates new inserts, across a single partition path. It also updates
the list of existing keys.
+ */
+ public Stream<HoodieRecord<T>> generateInsertsStreamOnPartition(String
commitTime, Integer n, String partitionPath) {
+ int currSize = getNumExistingKeys();
+
+ return IntStream.range(0, n).boxed().map(i -> {
+ HoodieKey key = new HoodieKey(UUID.randomUUID().toString(),
partitionPath);
+ KeyPartition kp = new KeyPartition();
+ kp.key = key;
+ kp.partitionPath = partitionPath;
+ existingKeys.put(currSize + i, kp);
+ numExistingKeys++;
+ return new HoodieAvroRecord<>(key, generateRandomValue(key, commitTime));
+ });
+ }
+
/**
* Generates new updates, randomly distributed across the keys above. There
can be duplicates within the returned
* list
*
* @param commitTime Commit Timestamp
- * @param n Number of updates (including dups)
+ * @param n Number of updates (including dups)
* @return list of hoodie record updates
*/
public List<HoodieRecord<T>> generateUpdates(String commitTime, Integer n) {
@@ -148,6 +172,23 @@ public class HoodieExampleDataGenerator<T extends
HoodieRecordPayload<T>> {
return updates;
}
+ /**
+ * Generates new updates, one for each of the keys above
+ * list
+ *
+ * @param commitTime Commit Timestamp
+ * @return list of hoodie record updates
+ */
+ public List<HoodieRecord<T>> generateUniqueUpdates(String commitTime) {
+ List<HoodieRecord<T>> updates = new ArrayList<>();
+ for (int i = 0; i < numExistingKeys; i++) {
+ KeyPartition kp = existingKeys.get(i);
+ HoodieRecord<T> record = generateUpdateRecord(kp.key, commitTime);
+ updates.add(record);
+ }
+ return updates;
+ }
+
public HoodieRecord<T> generateUpdateRecord(HoodieKey key, String
commitTime) {
return new HoodieAvroRecord<>(key, generateRandomValue(key, commitTime));
}
@@ -155,8 +196,8 @@ public class HoodieExampleDataGenerator<T extends
HoodieRecordPayload<T>> {
private Option<String> convertToString(HoodieRecord<T> record) {
try {
String str = HoodieAvroUtils
-
.bytesToAvro(((HoodieAvroPayload)record.getData()).getRecordBytes(), avroSchema)
- .toString();
+ .bytesToAvro(((HoodieAvroPayload)
record.getData()).getRecordBytes(), avroSchema)
+ .toString();
str = "{" + str.substring(str.indexOf("\"ts\":"));
return Option.of(str.replaceAll("}", ", \"partitionpath\": \"" +
record.getPartitionPath() + "\"}"));
} catch (IOException e) {
@@ -166,7 +207,7 @@ public class HoodieExampleDataGenerator<T extends
HoodieRecordPayload<T>> {
public List<String> convertToStringList(List<HoodieRecord<T>> records) {
return
records.stream().map(this::convertToString).filter(Option::isPresent).map(Option::get)
- .collect(Collectors.toList());
+ .collect(Collectors.toList());
}
public int getNumExistingKeys() {
diff --git
a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java
b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java
index 5a6db78f88..9c6293fe44 100644
---
a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java
+++
b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java
@@ -25,6 +25,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.examples.common.HoodieExampleDataGenerator;
import org.apache.hudi.examples.common.HoodieExampleSparkUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
@@ -65,30 +66,51 @@ public final class HoodieSparkQuickstart {
public static void runQuickstart(JavaSparkContext jsc, SparkSession spark,
String tableName, String tablePath) {
final HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new
HoodieExampleDataGenerator<>();
- insertData(spark, jsc, tablePath, tableName, dataGen);
+ String snapshotQuery = "SELECT begin_lat, begin_lon, driver, end_lat,
end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table";
+
+ Dataset<Row> insertDf = insertData(spark, jsc, tablePath, tableName,
dataGen);
queryData(spark, jsc, tablePath, tableName, dataGen);
+ assert insertDf.except(spark.sql(snapshotQuery)).count() == 0;
- updateData(spark, jsc, tablePath, tableName, dataGen);
+ Dataset<Row> snapshotBeforeUpdate = spark.sql(snapshotQuery);
+ Dataset<Row> updateDf = updateData(spark, jsc, tablePath, tableName,
dataGen);
queryData(spark, jsc, tablePath, tableName, dataGen);
+ Dataset<Row> snapshotAfterUpdate = spark.sql(snapshotQuery);
+ assert snapshotAfterUpdate.intersect(updateDf).count() == updateDf.count();
+ assert
snapshotAfterUpdate.except(updateDf).except(snapshotBeforeUpdate).count() == 0;
incrementalQuery(spark, tablePath, tableName);
pointInTimeQuery(spark, tablePath, tableName);
- delete(spark, tablePath, tableName);
+ Dataset<Row> snapshotBeforeDelete = snapshotAfterUpdate;
+ Dataset<Row> deleteDf = delete(spark, tablePath, tableName);
queryData(spark, jsc, tablePath, tableName, dataGen);
+ Dataset<Row> snapshotAfterDelete = spark.sql(snapshotQuery);
+ assert snapshotAfterDelete.intersect(deleteDf).count() == 0;
+ assert
snapshotBeforeDelete.except(deleteDf).except(snapshotAfterDelete).count() == 0;
- insertOverwriteData(spark, jsc, tablePath, tableName, dataGen);
+ Dataset<Row> snapshotBeforeOverwrite = snapshotAfterDelete;
+ Dataset<Row> overwriteDf = insertOverwriteData(spark, jsc, tablePath,
tableName, dataGen);
queryData(spark, jsc, tablePath, tableName, dataGen);
+ Dataset<Row> withoutThirdPartitionDf =
snapshotBeforeOverwrite.filter("partitionpath != '" +
HoodieExampleDataGenerator.DEFAULT_THIRD_PARTITION_PATH + "'");
+ Dataset<Row> expectedDf = withoutThirdPartitionDf.union(overwriteDf);
+ Dataset<Row> snapshotAfterOverwrite = spark.sql(snapshotQuery);
+ assert snapshotAfterOverwrite.except(expectedDf).count() == 0;
+
+ Dataset<Row> snapshotBeforeDeleteByPartition = snapshotAfterOverwrite;
deleteByPartition(spark, tablePath, tableName);
queryData(spark, jsc, tablePath, tableName, dataGen);
+ Dataset<Row> snapshotAfterDeleteByPartition = spark.sql(snapshotQuery);
+ assert
snapshotAfterDeleteByPartition.intersect(snapshotBeforeDeleteByPartition.filter("partitionpath
== '" + HoodieExampleDataGenerator.DEFAULT_FIRST_PARTITION_PATH +
"'")).count() == 0;
+ assert snapshotAfterDeleteByPartition.count() ==
snapshotBeforeDeleteByPartition.filter("partitionpath != '" +
HoodieExampleDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "'").count();
}
/**
* Generate some new trips, load them into a DataFrame and write the
DataFrame into the Hudi dataset as below.
*/
- public static void insertData(SparkSession spark, JavaSparkContext jsc,
String tablePath, String tableName,
- HoodieExampleDataGenerator<HoodieAvroPayload>
dataGen) {
+ public static Dataset<Row> insertData(SparkSession spark, JavaSparkContext
jsc, String tablePath, String tableName,
+
HoodieExampleDataGenerator<HoodieAvroPayload> dataGen) {
String commitTime = Long.toString(System.currentTimeMillis());
List<String> inserts =
dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20));
Dataset<Row> df = spark.read().json(jsc.parallelize(inserts, 1));
@@ -101,15 +123,16 @@ public final class HoodieSparkQuickstart {
.option(TBL_NAME.key(), tableName)
.mode(Overwrite)
.save(tablePath);
+ return df;
}
/**
* Generate new records, load them into a {@link Dataset} and
insert-overwrite it into the Hudi dataset
*/
- public static void insertOverwriteData(SparkSession spark, JavaSparkContext
jsc, String tablePath, String tableName,
- HoodieExampleDataGenerator<HoodieAvroPayload>
dataGen) {
+ public static Dataset<Row> insertOverwriteData(SparkSession spark,
JavaSparkContext jsc, String tablePath, String tableName,
+
HoodieExampleDataGenerator<HoodieAvroPayload> dataGen) {
String commitTime = Long.toString(System.currentTimeMillis());
- List<String> inserts =
dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20));
+ List<String> inserts =
dataGen.convertToStringList(dataGen.generateInsertsOnPartition(commitTime, 20,
HoodieExampleDataGenerator.DEFAULT_THIRD_PARTITION_PATH));
Dataset<Row> df = spark.read().json(jsc.parallelize(inserts, 1));
df.write().format("org.apache.hudi")
@@ -121,9 +144,9 @@ public final class HoodieSparkQuickstart {
.option(TBL_NAME.key(), tableName)
.mode(Append)
.save(tablePath);
+ return df;
}
-
/**
* Load the data files into a DataFrame.
*/
@@ -157,11 +180,11 @@ public final class HoodieSparkQuickstart {
* This is similar to inserting new data. Generate updates to existing trips
using the data generator,
* load into a DataFrame and write DataFrame into the hudi dataset.
*/
- public static void updateData(SparkSession spark, JavaSparkContext jsc,
String tablePath, String tableName,
- HoodieExampleDataGenerator<HoodieAvroPayload>
dataGen) {
+ public static Dataset<Row> updateData(SparkSession spark, JavaSparkContext
jsc, String tablePath, String tableName,
+
HoodieExampleDataGenerator<HoodieAvroPayload> dataGen) {
String commitTime = Long.toString(System.currentTimeMillis());
- List<String> updates =
dataGen.convertToStringList(dataGen.generateUpdates(commitTime, 10));
+ List<String> updates =
dataGen.convertToStringList(dataGen.generateUniqueUpdates(commitTime));
Dataset<Row> df = spark.read().json(jsc.parallelize(updates, 1));
df.write().format("org.apache.hudi")
.options(QuickstartUtils.getQuickstartWriteConfigs())
@@ -171,16 +194,18 @@ public final class HoodieSparkQuickstart {
.option(TBL_NAME.key(), tableName)
.mode(Append)
.save(tablePath);
+ return df;
}
/**
* Deleta data based in data information.
*/
- public static void delete(SparkSession spark, String tablePath, String
tableName) {
+ public static Dataset<Row> delete(SparkSession spark, String tablePath,
String tableName) {
Dataset<Row> roViewDF =
spark.read().format("org.apache.hudi").load(tablePath + "/*/*/*/*");
roViewDF.createOrReplaceTempView("hudi_ro_table");
- Dataset<Row> df = spark.sql("select uuid, partitionpath, ts from
hudi_ro_table limit 2");
+ Dataset<Row> toBeDeletedDf = spark.sql("SELECT begin_lat, begin_lon,
driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM
hudi_ro_table limit 2");
+ Dataset<Row> df = toBeDeletedDf.select("uuid", "partitionpath", "ts");
df.write().format("org.apache.hudi")
.options(QuickstartUtils.getQuickstartWriteConfigs())
@@ -191,10 +216,11 @@ public final class HoodieSparkQuickstart {
.option("hoodie.datasource.write.operation",
WriteOperationType.DELETE.value())
.mode(Append)
.save(tablePath);
+ return toBeDeletedDf;
}
/**
- * Delete the data of a single or multiple partitions.
+ * Delete the data of the first partition.
*/
public static void deleteByPartition(SparkSession spark, String tablePath,
String tableName) {
Dataset<Row> df = spark.emptyDataFrame();
@@ -204,9 +230,8 @@ public final class HoodieSparkQuickstart {
.option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid")
.option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),
"partitionpath")
.option(TBL_NAME.key(), tableName)
- .option("hoodie.datasource.write.operation",
WriteOperationType.DELETE.value())
- .option("hoodie.datasource.write.partitions.to.delete",
- String.join(", ",
HoodieExampleDataGenerator.DEFAULT_PARTITION_PATHS))
+ .option("hoodie.datasource.write.operation",
WriteOperationType.DELETE_PARTITION.value())
+ .option("hoodie.datasource.write.partitions.to.delete",
HoodieExampleDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
.mode(Append)
.save(tablePath);
}
@@ -223,7 +248,7 @@ public final class HoodieSparkQuickstart {
.map((Function<Row, String>) row -> row.getString(0))
.take(50);
- String beginTime = commits.get(commits.size() - 2); // commit time we are
interested in
+ String beginTime = commits.get(commits.size() - 1); // commit time we are
interested in
// incrementally query data
Dataset<Row> incViewDF = spark
@@ -250,7 +275,7 @@ public final class HoodieSparkQuickstart {
.map((Function<Row, String>) row -> row.getString(0))
.take(50);
String beginTime = "000"; // Represents all commits > this time.
- String endTime = commits.get(commits.size() - 2); // commit time we are
interested in
+ String endTime = commits.get(commits.size() - 1); // commit time we are
interested in
//incrementally query data
Dataset<Row> incViewDF = spark.read().format("org.apache.hudi")