This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 84b05c8473 [HUDI-4758] Add validations to java spark examples (#6615)
84b05c8473 is described below

commit 84b05c8473aec8ded4fcb2a45f87dd32284b7229
Author: Jon Vexler <[email protected]>
AuthorDate: Wed Sep 21 10:52:08 2022 -0400

    [HUDI-4758] Add validations to java spark examples (#6615)
---
 .../common/HoodieExampleDataGenerator.java         | 59 ++++++++++++++++---
 .../examples/quickstart/HoodieSparkQuickstart.java | 67 +++++++++++++++-------
 2 files changed, 96 insertions(+), 30 deletions(-)

diff --git 
a/hudi-examples/hudi-examples-common/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java
 
b/hudi-examples/hudi-examples-common/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java
index 4ce11acfa0..004271a329 100644
--- 
a/hudi-examples/hudi-examples-common/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java
+++ 
b/hudi-examples/hudi-examples-common/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java
@@ -55,11 +55,11 @@ public class HoodieExampleDataGenerator<T extends 
HoodieRecordPayload<T>> {
   public static final String[] DEFAULT_PARTITION_PATHS =
       {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, 
DEFAULT_THIRD_PARTITION_PATH};
   public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": 
\"triprec\",\"fields\": [ "
-          + "{\"name\": \"ts\",\"type\": \"long\"},{\"name\": \"uuid\", 
\"type\": \"string\"},"
-          + "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": 
\"driver\", \"type\": \"string\"},"
-          + "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": 
\"begin_lon\", \"type\": \"double\"},"
-          + "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": 
\"end_lon\", \"type\": \"double\"},"
-          + "{\"name\":\"fare\",\"type\": \"double\"}]}";
+      + "{\"name\": \"ts\",\"type\": \"long\"},{\"name\": \"uuid\", \"type\": 
\"string\"},"
+      + "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", 
\"type\": \"string\"},"
+      + "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": 
\"begin_lon\", \"type\": \"double\"},"
+      + "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", 
\"type\": \"double\"},"
+      + "{\"name\":\"fare\",\"type\": \"double\"}]}";
   public static Schema avroSchema = new 
Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
 
   private static final Random RAND = new Random(46474747);
@@ -130,12 +130,36 @@ public class HoodieExampleDataGenerator<T extends 
HoodieRecordPayload<T>> {
     });
   }
 
+  /**
+   * Generates new inserts, across a single partition path. It also updates 
the list of existing keys.
+   */
+  public List<HoodieRecord<T>> generateInsertsOnPartition(String commitTime, 
Integer n, String partitionPath) {
+    return generateInsertsStreamOnPartition(commitTime, n, 
partitionPath).collect(Collectors.toList());
+  }
+
+  /**
+   * Generates new inserts, across a single partition path. It also updates 
the list of existing keys.
+   */
+  public Stream<HoodieRecord<T>> generateInsertsStreamOnPartition(String 
commitTime, Integer n, String partitionPath) {
+    int currSize = getNumExistingKeys();
+
+    return IntStream.range(0, n).boxed().map(i -> {
+      HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), 
partitionPath);
+      KeyPartition kp = new KeyPartition();
+      kp.key = key;
+      kp.partitionPath = partitionPath;
+      existingKeys.put(currSize + i, kp);
+      numExistingKeys++;
+      return new HoodieAvroRecord<>(key, generateRandomValue(key, commitTime));
+    });
+  }
+
   /**
    * Generates new updates, randomly distributed across the keys above. There 
can be duplicates within the returned
    * list
    *
    * @param commitTime Commit Timestamp
-   * @param n Number of updates (including dups)
+   * @param n          Number of updates (including dups)
    * @return list of hoodie record updates
    */
   public List<HoodieRecord<T>> generateUpdates(String commitTime, Integer n) {
@@ -148,6 +172,23 @@ public class HoodieExampleDataGenerator<T extends 
HoodieRecordPayload<T>> {
     return updates;
   }
 
+  /**
+   * Generates new updates, one for each of the keys above
+   * list
+   *
+   * @param commitTime Commit Timestamp
+   * @return list of hoodie record updates
+   */
+  public List<HoodieRecord<T>> generateUniqueUpdates(String commitTime) {
+    List<HoodieRecord<T>> updates = new ArrayList<>();
+    for (int i = 0; i < numExistingKeys; i++) {
+      KeyPartition kp = existingKeys.get(i);
+      HoodieRecord<T> record = generateUpdateRecord(kp.key, commitTime);
+      updates.add(record);
+    }
+    return updates;
+  }
+
   public HoodieRecord<T> generateUpdateRecord(HoodieKey key, String 
commitTime) {
     return new HoodieAvroRecord<>(key, generateRandomValue(key, commitTime));
   }
@@ -155,8 +196,8 @@ public class HoodieExampleDataGenerator<T extends 
HoodieRecordPayload<T>> {
   private Option<String> convertToString(HoodieRecord<T> record) {
     try {
       String str = HoodieAvroUtils
-              
.bytesToAvro(((HoodieAvroPayload)record.getData()).getRecordBytes(), avroSchema)
-              .toString();
+          .bytesToAvro(((HoodieAvroPayload) 
record.getData()).getRecordBytes(), avroSchema)
+          .toString();
       str = "{" + str.substring(str.indexOf("\"ts\":"));
       return Option.of(str.replaceAll("}", ", \"partitionpath\": \"" + 
record.getPartitionPath() + "\"}"));
     } catch (IOException e) {
@@ -166,7 +207,7 @@ public class HoodieExampleDataGenerator<T extends 
HoodieRecordPayload<T>> {
 
   public List<String> convertToStringList(List<HoodieRecord<T>> records) {
     return 
records.stream().map(this::convertToString).filter(Option::isPresent).map(Option::get)
-            .collect(Collectors.toList());
+        .collect(Collectors.toList());
   }
 
   public int getNumExistingKeys() {
diff --git 
a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java
 
b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java
index 5a6db78f88..9c6293fe44 100644
--- 
a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java
+++ 
b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java
@@ -25,6 +25,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.examples.common.HoodieExampleDataGenerator;
 import org.apache.hudi.examples.common.HoodieExampleSparkUtils;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
@@ -65,30 +66,51 @@ public final class HoodieSparkQuickstart {
   public static void runQuickstart(JavaSparkContext jsc, SparkSession spark, 
String tableName, String tablePath) {
     final HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new 
HoodieExampleDataGenerator<>();
 
-    insertData(spark, jsc, tablePath, tableName, dataGen);
+    String snapshotQuery = "SELECT begin_lat, begin_lon, driver, end_lat, 
end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table";
+
+    Dataset<Row> insertDf = insertData(spark, jsc, tablePath, tableName, 
dataGen);
     queryData(spark, jsc, tablePath, tableName, dataGen);
+    assert insertDf.except(spark.sql(snapshotQuery)).count() == 0;
 
-    updateData(spark, jsc, tablePath, tableName, dataGen);
+    Dataset<Row> snapshotBeforeUpdate = spark.sql(snapshotQuery);
+    Dataset<Row> updateDf = updateData(spark, jsc, tablePath, tableName, 
dataGen);
     queryData(spark, jsc, tablePath, tableName, dataGen);
+    Dataset<Row> snapshotAfterUpdate = spark.sql(snapshotQuery);
+    assert snapshotAfterUpdate.intersect(updateDf).count() == updateDf.count();
+    assert 
snapshotAfterUpdate.except(updateDf).except(snapshotBeforeUpdate).count() == 0;
 
     incrementalQuery(spark, tablePath, tableName);
     pointInTimeQuery(spark, tablePath, tableName);
 
-    delete(spark, tablePath, tableName);
+    Dataset<Row> snapshotBeforeDelete = snapshotAfterUpdate;
+    Dataset<Row> deleteDf = delete(spark, tablePath, tableName);
     queryData(spark, jsc, tablePath, tableName, dataGen);
+    Dataset<Row> snapshotAfterDelete = spark.sql(snapshotQuery);
+    assert snapshotAfterDelete.intersect(deleteDf).count() == 0;
+    assert 
snapshotBeforeDelete.except(deleteDf).except(snapshotAfterDelete).count() == 0;
 
-    insertOverwriteData(spark, jsc, tablePath, tableName, dataGen);
+    Dataset<Row> snapshotBeforeOverwrite = snapshotAfterDelete;
+    Dataset<Row> overwriteDf = insertOverwriteData(spark, jsc, tablePath, 
tableName, dataGen);
     queryData(spark, jsc, tablePath, tableName, dataGen);
+    Dataset<Row> withoutThirdPartitionDf = 
snapshotBeforeOverwrite.filter("partitionpath != '" + 
HoodieExampleDataGenerator.DEFAULT_THIRD_PARTITION_PATH + "'");
+    Dataset<Row> expectedDf = withoutThirdPartitionDf.union(overwriteDf);
+    Dataset<Row> snapshotAfterOverwrite = spark.sql(snapshotQuery);
+    assert snapshotAfterOverwrite.except(expectedDf).count() == 0;
+
 
+    Dataset<Row> snapshotBeforeDeleteByPartition = snapshotAfterOverwrite;
     deleteByPartition(spark, tablePath, tableName);
     queryData(spark, jsc, tablePath, tableName, dataGen);
+    Dataset<Row> snapshotAfterDeleteByPartition = spark.sql(snapshotQuery);
+    assert 
snapshotAfterDeleteByPartition.intersect(snapshotBeforeDeleteByPartition.filter("partitionpath
 == '" + HoodieExampleDataGenerator.DEFAULT_FIRST_PARTITION_PATH + 
"'")).count() == 0;
+    assert snapshotAfterDeleteByPartition.count() == 
snapshotBeforeDeleteByPartition.filter("partitionpath != '" + 
HoodieExampleDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "'").count();
   }
 
   /**
    * Generate some new trips, load them into a DataFrame and write the 
DataFrame into the Hudi dataset as below.
    */
-  public static void insertData(SparkSession spark, JavaSparkContext jsc, 
String tablePath, String tableName,
-                                HoodieExampleDataGenerator<HoodieAvroPayload> 
dataGen) {
+  public static Dataset<Row> insertData(SparkSession spark, JavaSparkContext 
jsc, String tablePath, String tableName,
+                                        
HoodieExampleDataGenerator<HoodieAvroPayload> dataGen) {
     String commitTime = Long.toString(System.currentTimeMillis());
     List<String> inserts = 
dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20));
     Dataset<Row> df = spark.read().json(jsc.parallelize(inserts, 1));
@@ -101,15 +123,16 @@ public final class HoodieSparkQuickstart {
         .option(TBL_NAME.key(), tableName)
         .mode(Overwrite)
         .save(tablePath);
+    return df;
   }
 
   /**
    * Generate new records, load them into a {@link Dataset} and 
insert-overwrite it into the Hudi dataset
    */
-  public static void insertOverwriteData(SparkSession spark, JavaSparkContext 
jsc, String tablePath, String tableName,
-                                HoodieExampleDataGenerator<HoodieAvroPayload> 
dataGen) {
+  public static Dataset<Row> insertOverwriteData(SparkSession spark, 
JavaSparkContext jsc, String tablePath, String tableName,
+                                                 
HoodieExampleDataGenerator<HoodieAvroPayload> dataGen) {
     String commitTime = Long.toString(System.currentTimeMillis());
-    List<String> inserts = 
dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20));
+    List<String> inserts = 
dataGen.convertToStringList(dataGen.generateInsertsOnPartition(commitTime, 20, 
HoodieExampleDataGenerator.DEFAULT_THIRD_PARTITION_PATH));
     Dataset<Row> df = spark.read().json(jsc.parallelize(inserts, 1));
 
     df.write().format("org.apache.hudi")
@@ -121,9 +144,9 @@ public final class HoodieSparkQuickstart {
         .option(TBL_NAME.key(), tableName)
         .mode(Append)
         .save(tablePath);
+    return df;
   }
 
-
   /**
    * Load the data files into a DataFrame.
    */
@@ -157,11 +180,11 @@ public final class HoodieSparkQuickstart {
    * This is similar to inserting new data. Generate updates to existing trips 
using the data generator,
    * load into a DataFrame and write DataFrame into the hudi dataset.
    */
-  public static void updateData(SparkSession spark, JavaSparkContext jsc, 
String tablePath, String tableName,
-                                HoodieExampleDataGenerator<HoodieAvroPayload> 
dataGen) {
+  public static Dataset<Row> updateData(SparkSession spark, JavaSparkContext 
jsc, String tablePath, String tableName,
+                                        
HoodieExampleDataGenerator<HoodieAvroPayload> dataGen) {
 
     String commitTime = Long.toString(System.currentTimeMillis());
-    List<String> updates = 
dataGen.convertToStringList(dataGen.generateUpdates(commitTime, 10));
+    List<String> updates = 
dataGen.convertToStringList(dataGen.generateUniqueUpdates(commitTime));
     Dataset<Row> df = spark.read().json(jsc.parallelize(updates, 1));
     df.write().format("org.apache.hudi")
         .options(QuickstartUtils.getQuickstartWriteConfigs())
@@ -171,16 +194,18 @@ public final class HoodieSparkQuickstart {
         .option(TBL_NAME.key(), tableName)
         .mode(Append)
         .save(tablePath);
+    return df;
   }
 
   /**
    * Deleta data based in data information.
    */
-  public static void delete(SparkSession spark, String tablePath, String 
tableName) {
+  public static Dataset<Row> delete(SparkSession spark, String tablePath, 
String tableName) {
 
     Dataset<Row> roViewDF = 
spark.read().format("org.apache.hudi").load(tablePath + "/*/*/*/*");
     roViewDF.createOrReplaceTempView("hudi_ro_table");
-    Dataset<Row> df = spark.sql("select uuid, partitionpath, ts from  
hudi_ro_table limit 2");
+    Dataset<Row> toBeDeletedDf = spark.sql("SELECT begin_lat, begin_lon, 
driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM 
hudi_ro_table limit 2");
+    Dataset<Row> df = toBeDeletedDf.select("uuid", "partitionpath", "ts");
 
     df.write().format("org.apache.hudi")
         .options(QuickstartUtils.getQuickstartWriteConfigs())
@@ -191,10 +216,11 @@ public final class HoodieSparkQuickstart {
         .option("hoodie.datasource.write.operation", 
WriteOperationType.DELETE.value())
         .mode(Append)
         .save(tablePath);
+    return toBeDeletedDf;
   }
 
   /**
-   * Delete the data of a single or multiple partitions.
+   * Delete the data of the first partition.
    */
   public static void deleteByPartition(SparkSession spark, String tablePath, 
String tableName) {
     Dataset<Row> df = spark.emptyDataFrame();
@@ -204,9 +230,8 @@ public final class HoodieSparkQuickstart {
         .option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid")
         .option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), 
"partitionpath")
         .option(TBL_NAME.key(), tableName)
-        .option("hoodie.datasource.write.operation", 
WriteOperationType.DELETE.value())
-        .option("hoodie.datasource.write.partitions.to.delete",
-            String.join(", ", 
HoodieExampleDataGenerator.DEFAULT_PARTITION_PATHS))
+        .option("hoodie.datasource.write.operation", 
WriteOperationType.DELETE_PARTITION.value())
+        .option("hoodie.datasource.write.partitions.to.delete", 
HoodieExampleDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
         .mode(Append)
         .save(tablePath);
   }
@@ -223,7 +248,7 @@ public final class HoodieSparkQuickstart {
             .map((Function<Row, String>) row -> row.getString(0))
             .take(50);
 
-    String beginTime = commits.get(commits.size() - 2); // commit time we are 
interested in
+    String beginTime = commits.get(commits.size() - 1); // commit time we are 
interested in
 
     // incrementally query data
     Dataset<Row> incViewDF = spark
@@ -250,7 +275,7 @@ public final class HoodieSparkQuickstart {
             .map((Function<Row, String>) row -> row.getString(0))
             .take(50);
     String beginTime = "000"; // Represents all commits > this time.
-    String endTime = commits.get(commits.size() - 2); // commit time we are 
interested in
+    String endTime = commits.get(commits.size() - 1); // commit time we are 
interested in
 
     //incrementally query data
     Dataset<Row> incViewDF = spark.read().format("org.apache.hudi")

Reply via email to