This is an automated email from the ASF dual-hosted git repository.

satish pushed a commit to branch release-0.12.2
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 9913de4ebfa2da09aa5352d814c3078e4b493441
Author: Jon Vexler <[email protected]>
AuthorDate: Tue Nov 29 08:47:29 2022 -0500

    [HUDI-5269] Enhancing spark-sql write tests for some of the core user flows 
(#7230)
    
    Add good test coverage for some of the core user flows w/ spark data source 
writes.
---
 .../common/testutils/HoodieTestDataGenerator.java  | 181 +++++++--
 .../hudi/functional/TestSparkSqlCoreFlow.scala     | 441 +++++++++++++++++++++
 2 files changed, 580 insertions(+), 42 deletions(-)

diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index e1aa0c7a01d..5cedea06342 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -126,6 +126,9 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
   public static final String TRIP_FLATTENED_SCHEMA =
       TRIP_SCHEMA_PREFIX + FARE_FLATTENED_SCHEMA + TRIP_SCHEMA_SUFFIX;
 
+  public static final String TRIP_NESTED_EXAMPLE_SCHEMA =
+      TRIP_SCHEMA_PREFIX + FARE_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
+
   public static final String TRIP_SCHEMA = 
"{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":["
       + 
"{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},"
       + 
"{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\":
 \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}";
@@ -139,6 +142,7 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
 
 
   public static final Schema AVRO_SCHEMA = new 
Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
+  public static final Schema NESTED_AVRO_SCHEMA = new 
Schema.Parser().parse(TRIP_NESTED_EXAMPLE_SCHEMA);
   public static final TypeDescription ORC_SCHEMA = 
AvroOrcUtils.createOrcSchema(new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA));
   public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS =
       HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA);
@@ -159,13 +163,21 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
     this(seed, DEFAULT_PARTITION_PATHS, new HashMap<>());
   }
 
+  public HoodieTestDataGenerator(String schema, long seed) {
+    this(schema, seed, DEFAULT_PARTITION_PATHS, new HashMap<>());
+  }
+
   public HoodieTestDataGenerator(long seed, String[] partitionPaths, 
Map<Integer, KeyPartition> keyPartitionMap) {
+    this(TRIP_EXAMPLE_SCHEMA, seed, partitionPaths, keyPartitionMap);
+  }
+
+  public HoodieTestDataGenerator(String schema, long seed, String[] 
partitionPaths, Map<Integer, KeyPartition> keyPartitionMap) {
     this.rand = new Random(seed);
     this.partitionPaths = Arrays.copyOf(partitionPaths, partitionPaths.length);
     this.existingKeysBySchema = new HashMap<>();
-    this.existingKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, keyPartitionMap);
+    this.existingKeysBySchema.put(schema, keyPartitionMap);
     this.numKeysBySchema = new HashMap<>();
-    this.numKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, keyPartitionMap.size());
+    this.numKeysBySchema.put(schema, keyPartitionMap.size());
 
     logger.info(String.format("Test DataGenerator's seed (%s)", seed));
   }
@@ -223,6 +235,8 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
       return generatePayloadForTripSchema(key, commitTime);
     } else if (SHORT_TRIP_SCHEMA.equals(schemaStr)) {
       return generatePayloadForShortTripSchema(key, commitTime);
+    } else if (TRIP_NESTED_EXAMPLE_SCHEMA.equals(schemaStr)) {
+      return generateNestedExampleRandomValue(key, commitTime);
     }
 
     return null;
@@ -255,6 +269,11 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
     return generateRandomValue(key, instantTime, isFlattened, 0);
   }
 
+  private RawTripTestPayload generateNestedExampleRandomValue(
+      HoodieKey key, String instantTime) throws IOException {
+    return generateNestedExampleRandomValue(key, instantTime, 0);
+  }
+
   private RawTripTestPayload generateRandomValue(
       HoodieKey key, String instantTime, boolean isFlattened, int ts) throws 
IOException {
     GenericRecord rec = generateGenericRecord(
@@ -263,6 +282,14 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
     return new RawTripTestPayload(rec.toString(), key.getRecordKey(), 
key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
   }
 
+  private RawTripTestPayload generateNestedExampleRandomValue(
+      HoodieKey key, String instantTime, int ts) throws IOException {
+    GenericRecord rec = generateNestedExampleGenericRecord(
+        key.getRecordKey(), key.getPartitionPath(), "rider-" + instantTime, 
"driver-" + instantTime, ts,
+        false);
+    return new RawTripTestPayload(rec.toString(), key.getRecordKey(), 
key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
+  }
+
   /**
    * Generates a new avro record with TRIP_SCHEMA, retaining the key if 
optionally provided.
    */
@@ -298,10 +325,11 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
     return generateGenericRecord(rowKey, partitionPath, riderName, driverName, 
timestamp, false, false);
   }
 
-  public GenericRecord generateGenericRecord(String rowKey, String 
partitionPath, String riderName, String driverName,
-                                                    long timestamp, boolean 
isDeleteRecord,
-                                                    boolean isFlattened) {
-    GenericRecord rec = new GenericData.Record(isFlattened ? 
FLATTENED_AVRO_SCHEMA : AVRO_SCHEMA);
+
+  /**
+   * Populate rec with values for TRIP_SCHEMA_PREFIX
+   */
+  private void generateTripPrefixValues(GenericRecord rec, String rowKey, 
String partitionPath, String riderName, String driverName, long timestamp) {
     rec.put("_row_key", rowKey);
     rec.put("timestamp", timestamp);
     rec.put("partition_path", partitionPath);
@@ -311,47 +339,108 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
     rec.put("begin_lon", rand.nextDouble());
     rec.put("end_lat", rand.nextDouble());
     rec.put("end_lon", rand.nextDouble());
-    if (isFlattened) {
-      rec.put("fare", rand.nextDouble() * 100);
-      rec.put("currency", "USD");
-    } else {
-      rec.put("distance_in_meters", rand.nextInt());
-      rec.put("seconds_since_epoch", rand.nextLong());
-      rec.put("weight", rand.nextFloat());
-      byte[] bytes = "Canada".getBytes();
-      rec.put("nation", ByteBuffer.wrap(bytes));
-      long randomMillis = genRandomTimeMillis(rand);
-      Instant instant = Instant.ofEpochMilli(randomMillis);
-      rec.put("current_date", (int) LocalDateTime.ofInstant(instant, 
ZoneOffset.UTC).toLocalDate().toEpochDay());
-      rec.put("current_ts", randomMillis);
-
-      BigDecimal bigDecimal = new BigDecimal(String.format("%5f", 
rand.nextFloat()));
-      Schema decimalSchema = AVRO_SCHEMA.getField("height").schema();
-      Conversions.DecimalConversion decimalConversions = new 
Conversions.DecimalConversion();
-      GenericFixed genericFixed = decimalConversions.toFixed(bigDecimal, 
decimalSchema, LogicalTypes.decimal(10, 6));
-      rec.put("height", genericFixed);
-
-      rec.put("city_to_state", Collections.singletonMap("LA", "CA"));
-
-      GenericRecord fareRecord = new 
GenericData.Record(AVRO_SCHEMA.getField("fare").schema());
-      fareRecord.put("amount", rand.nextDouble() * 100);
-      fareRecord.put("currency", "USD");
-      rec.put("fare", fareRecord);
-
-      GenericArray<GenericRecord> tipHistoryArray = new GenericData.Array<>(1, 
AVRO_SCHEMA.getField("tip_history").schema());
-      Schema tipSchema = new 
Schema.Parser().parse(AVRO_SCHEMA.getField("tip_history").schema().toString()).getElementType();
-      GenericRecord tipRecord = new GenericData.Record(tipSchema);
-      tipRecord.put("amount", rand.nextDouble() * 100);
-      tipRecord.put("currency", "USD");
-      tipHistoryArray.add(tipRecord);
-      rec.put("tip_history", tipHistoryArray);
-    }
+  }
+
+  /**
+   * Populate rec with values for FARE_FLATTENED_SCHEMA
+   */
+  private void generateFareFlattenedValues(GenericRecord rec) {
+    rec.put("fare", rand.nextDouble() * 100);
+    rec.put("currency", "USD");
+  }
+
+  /**
+   * Populate rec with values for EXTRA_TYPE_SCHEMA
+   */
+  private void generateExtraSchemaValues(GenericRecord rec) {
+    rec.put("distance_in_meters", rand.nextInt());
+    rec.put("seconds_since_epoch", rand.nextLong());
+    rec.put("weight", rand.nextFloat());
+    byte[] bytes = "Canada".getBytes();
+    rec.put("nation", ByteBuffer.wrap(bytes));
+    long randomMillis = genRandomTimeMillis(rand);
+    Instant instant = Instant.ofEpochMilli(randomMillis);
+    rec.put("current_date", (int) LocalDateTime.ofInstant(instant, 
ZoneOffset.UTC).toLocalDate().toEpochDay());
+    rec.put("current_ts", randomMillis);
+
+    BigDecimal bigDecimal = new BigDecimal(String.format("%5f", 
rand.nextFloat()));
+    Schema decimalSchema = AVRO_SCHEMA.getField("height").schema();
+    Conversions.DecimalConversion decimalConversions = new 
Conversions.DecimalConversion();
+    GenericFixed genericFixed = decimalConversions.toFixed(bigDecimal, 
decimalSchema, LogicalTypes.decimal(10, 6));
+    rec.put("height", genericFixed);
+  }
+
+  /**
+   * Populate rec with values for MAP_TYPE_SCHEMA
+   */
+  private void generateMapTypeValues(GenericRecord rec) {
+    rec.put("city_to_state", Collections.singletonMap("LA", "CA"));
+  }
 
+  /**
+   * Populate rec with values for FARE_NESTED_SCHEMA
+   */
+  private void generateFareNestedValues(GenericRecord rec) {
+    GenericRecord fareRecord = new 
GenericData.Record(AVRO_SCHEMA.getField("fare").schema());
+    fareRecord.put("amount", rand.nextDouble() * 100);
+    fareRecord.put("currency", "USD");
+    rec.put("fare", fareRecord);
+  }
+
+  /**
+   * Populate rec with values for TIP_NESTED_SCHEMA
+   */
+  private void generateTipNestedValues(GenericRecord rec) {
+    GenericArray<GenericRecord> tipHistoryArray = new GenericData.Array<>(1, 
AVRO_SCHEMA.getField("tip_history").schema());
+    Schema tipSchema = new 
Schema.Parser().parse(AVRO_SCHEMA.getField("tip_history").schema().toString()).getElementType();
+    GenericRecord tipRecord = new GenericData.Record(tipSchema);
+    tipRecord.put("amount", rand.nextDouble() * 100);
+    tipRecord.put("currency", "USD");
+    tipHistoryArray.add(tipRecord);
+    rec.put("tip_history", tipHistoryArray);
+  }
+
+  /**
+   * Populate rec with values for TRIP_SCHEMA_SUFFIX
+   */
+  private void generateTripSuffixValues(GenericRecord rec, boolean 
isDeleteRecord) {
     if (isDeleteRecord) {
       rec.put("_hoodie_is_deleted", true);
     } else {
       rec.put("_hoodie_is_deleted", false);
     }
+  }
+
+
+  /**
+   * Generate record conforming to TRIP_EXAMPLE_SCHEMA or 
TRIP_FLATTENED_SCHEMA if isFlattened is true
+   */
+  public GenericRecord generateGenericRecord(String rowKey, String 
partitionPath, String riderName, String driverName,
+                                                    long timestamp, boolean 
isDeleteRecord,
+                                                    boolean isFlattened) {
+    GenericRecord rec = new GenericData.Record(isFlattened ? 
FLATTENED_AVRO_SCHEMA : AVRO_SCHEMA);
+    generateTripPrefixValues(rec, rowKey, partitionPath, riderName, 
driverName, timestamp);
+    if (isFlattened) {
+      generateFareFlattenedValues(rec);
+    } else {
+      generateExtraSchemaValues(rec);
+      generateMapTypeValues(rec);
+      generateFareNestedValues(rec);
+      generateTipNestedValues(rec);
+    }
+    generateTripSuffixValues(rec, isDeleteRecord);
+    return rec;
+  }
+
+  /**
+   * Generate record conforming to TRIP_NESTED_EXAMPLE_SCHEMA
+   */
+  public  GenericRecord generateNestedExampleGenericRecord(String rowKey, 
String partitionPath, String riderName, String driverName,
+                                                        long timestamp, 
boolean isDeleteRecord) {
+    GenericRecord rec = new GenericData.Record(NESTED_AVRO_SCHEMA);
+    generateTripPrefixValues(rec, rowKey, partitionPath, riderName, 
driverName, timestamp);
+    generateFareNestedValues(rec);
+    generateTripSuffixValues(rec, isDeleteRecord);
     return rec;
   }
 
@@ -474,13 +563,17 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
   }
 
   /**
-   * Generates new inserts with nested schema, uniformly across the partition 
paths above.
+   * Generates new inserts for TRIP_EXAMPLE_SCHEMA with nested schema, 
uniformly across the partition paths above.
    * It also updates the list of existing keys.
    */
   public List<HoodieRecord> generateInserts(String instantTime, Integer n) {
     return generateInserts(instantTime, n, false);
   }
 
+  public List<HoodieRecord> generateInsertsNestedExample(String instantTime, 
Integer n) {
+    return generateInsertsStream(instantTime, n, false, 
TRIP_NESTED_EXAMPLE_SCHEMA).collect(Collectors.toList());
+  }
+
   /**
    * Generates new inserts, uniformly across the partition paths above.
    * It also updates the list of existing keys.
@@ -721,6 +814,10 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
     return generateUniqueUpdatesStream(instantTime, n, 
TRIP_EXAMPLE_SCHEMA).collect(Collectors.toList());
   }
 
+  public List<HoodieRecord> generateUniqueUpdatesNestedExample(String 
instantTime, Integer n) {
+    return generateUniqueUpdatesStream(instantTime, n, 
TRIP_NESTED_EXAMPLE_SCHEMA).collect(Collectors.toList());
+  }
+
   public List<HoodieRecord> generateUniqueUpdatesAsPerSchema(String 
instantTime, Integer n, String schemaStr) {
     return generateUniqueUpdatesStream(instantTime, n, 
schemaStr).collect(Collectors.toList());
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala
new file mode 100644
index 00000000000..e1b888aca20
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.{DataSourceReadOptions, HoodieDataSourceHelpers, 
HoodieSparkUtils}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator
+import org.apache.spark.sql
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.scalatest.Inspectors.forAll
+
+import java.io.File
+import scala.collection.JavaConversions._
+
+
+class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
+  val colsToCompare = "timestamp, _row_key, partition_path, rider, driver, 
begin_lat, begin_lon, end_lat, end_lon, fare.amount, fare.currency, 
_hoodie_is_deleted"
+
+  //params for core flow tests
+  val params: List[String] = List(
+        
"COPY_ON_WRITE|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+        
"COPY_ON_WRITE|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+        
"COPY_ON_WRITE|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+        
"COPY_ON_WRITE|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+        
"COPY_ON_WRITE|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+        
"COPY_ON_WRITE|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+        
"COPY_ON_WRITE|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+        
"COPY_ON_WRITE|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+        
"COPY_ON_WRITE|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+        
"MERGE_ON_READ|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+        
"MERGE_ON_READ|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+        
"MERGE_ON_READ|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+        
"MERGE_ON_READ|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+        
"MERGE_ON_READ|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+        
"MERGE_ON_READ|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+        
"MERGE_ON_READ|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+        
"MERGE_ON_READ|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+        
"MERGE_ON_READ|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM"
+      )
+
+  //extracts the params and runs each core flow test
+  forAll (params) { (paramStr: String) =>
+    test(s"Core flow with params: $paramStr") {
+      val splits = paramStr.split('|')
+      withTempDir { basePath =>
+        testCoreFlows(basePath,
+          tableType = splits(0),
+          isMetadataEnabledOnWrite = splits(1).toBoolean,
+          isMetadataEnabledOnRead = splits(2).toBoolean,
+          keyGenClass = splits(3),
+          indexType = splits(4))
+      }
+    }
+  }
+
+  def testCoreFlows(basePath: File, tableType: String, 
isMetadataEnabledOnWrite: Boolean, isMetadataEnabledOnRead: Boolean, 
keyGenClass: String, indexType: String): Unit = {
+    //Create table and set up for testing
+    val tableName = generateTableName
+    val tableBasePath = basePath.getCanonicalPath + "/" + tableName
+    val writeOptions = getWriteOptions(tableName, tableType, 
isMetadataEnabledOnWrite, keyGenClass, indexType)
+    createTable(tableName, keyGenClass, writeOptions, tableBasePath)
+    val fs = FSUtils.getFs(tableBasePath, 
spark.sparkContext.hadoopConfiguration)
+    val dataGen = new 
HoodieTestDataGenerator(HoodieTestDataGenerator.TRIP_NESTED_EXAMPLE_SCHEMA, 
0xDEED)
+
+    //Bulk insert first set of records
+    val inputDf0 = generateInserts(dataGen, "000", 100)
+    inputDf0.cache()
+    insertInto(tableName, inputDf0, "bulk_insert", isMetadataEnabledOnWrite, 
keyGenClass)
+    assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, tableBasePath, "000"))
+    //Verify bulk insert works correctly
+    val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabledOnRead)
+    snapshotDf1.cache()
+    assertEquals(100, snapshotDf1.count())
+    compareEntireInputDfWithHudiDf(inputDf0, snapshotDf1)
+
+    //Test updated records
+    val updateDf = generateUniqueUpdates(dataGen, "001", 50)
+    insertInto(tableName, updateDf, "upsert", isMetadataEnabledOnWrite, 
keyGenClass)
+    val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, 
tableBasePath)
+    val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabledOnRead)
+    snapshotDf2.cache()
+    assertEquals(100, snapshotDf2.count())
+    compareUpdateDfWithHudiDf(updateDf, snapshotDf2, snapshotDf1)
+
+    val inputDf2 = generateUniqueUpdates(dataGen, "002", 60)
+    val uniqueKeyCnt2 = inputDf2.select("_row_key").distinct().count()
+    insertInto(tableName, inputDf2, "upsert", isMetadataEnabledOnWrite, 
keyGenClass)
+    val commitInstantTime3 = HoodieDataSourceHelpers.latestCommit(fs, 
tableBasePath)
+    assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, 
tableBasePath, "000").size())
+
+    val snapshotDf3 = doSnapshotRead(tableName, isMetadataEnabledOnRead)
+    snapshotDf3.cache()
+    assertEquals(100, snapshotDf3.count())
+    compareUpdateDfWithHudiDf(inputDf2, snapshotDf3, snapshotDf3)
+
+    // Read Incremental Query, need to use spark-ds because functionality does 
not exist for spark sql
+    // we have 2 commits, try pulling the first commit (which is not the 
latest)
+    //HUDI-5266
+    val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, 
tableBasePath, "000").get(0)
+    val hoodieIncViewDf1 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "000")
+      .option(DataSourceReadOptions.END_INSTANTTIME.key, firstCommit)
+      .load(tableBasePath)
+    //val hoodieIncViewDf1 = doIncRead(tableName, isMetadataEnabledOnRead, 
"000", firstCommit)
+    assertEquals(100, hoodieIncViewDf1.count()) // 100 initial inserts must be 
pulled
+    var countsPerCommit = 
hoodieIncViewDf1.groupBy("_hoodie_commit_time").count().collect()
+    assertEquals(1, countsPerCommit.length)
+    assertEquals(firstCommit, countsPerCommit(0).get(0).toString)
+
+    val inputDf3 = generateUniqueUpdates(dataGen, "003", 80)
+    insertInto(tableName, inputDf3, "upsert", isMetadataEnabledOnWrite, 
keyGenClass)
+
+    //another incremental query with commit2 and commit3
+    //HUDI-5266
+    val hoodieIncViewDf2 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime2)
+      .option(DataSourceReadOptions.END_INSTANTTIME.key(), commitInstantTime3)
+      .load(tableBasePath)
+
+    assertEquals(uniqueKeyCnt2, hoodieIncViewDf2.count()) // 60 records must 
be pulled
+    countsPerCommit = 
hoodieIncViewDf2.groupBy("_hoodie_commit_time").count().collect()
+    assertEquals(1, countsPerCommit.length)
+    assertEquals(commitInstantTime3, countsPerCommit(0).get(0).toString)
+
+
+    val timeTravelDf = if (HoodieSparkUtils.gteqSpark3_2_1) {
+      spark.sql(s"select * from $tableName timestamp as of 
'$commitInstantTime2'")
+    } else {
+      //HUDI-5265
+      spark.read.format("org.apache.hudi")
+        .option("as.of.instant", commitInstantTime2)
+        .load(tableBasePath)
+    }
+    assertEquals(100, timeTravelDf.count())
+    compareEntireInputDfWithHudiDf(snapshotDf2, timeTravelDf)
+
+    if (tableType.equals("MERGE_ON_READ")) {
+      val readOptDf = doMORReadOptimizedQuery(isMetadataEnabledOnRead, 
tableBasePath)
+      compareEntireInputDfWithHudiDf(inputDf0, readOptDf)
+
+      val snapshotDf4 = doSnapshotRead(tableName, isMetadataEnabledOnRead)
+      snapshotDf4.cache()
+
+      // trigger compaction and try out Read optimized query.
+      val inputDf4 = generateUniqueUpdates(dataGen, "004", 40)
+      doInlineCompact(tableName, inputDf4, "upsert", isMetadataEnabledOnWrite, 
"3", keyGenClass)
+      val snapshotDf5 = doSnapshotRead(tableName, isMetadataEnabledOnRead)
+      snapshotDf5.cache()
+      compareUpdateDfWithHudiDf(inputDf4, snapshotDf5, snapshotDf4)
+
+      // compaction is expected to have completed. both RO and RT are expected 
to return same results.
+      compareROAndRT(isMetadataEnabledOnRead, tableName, tableBasePath)
+    }
+  }
+
+  def doSnapshotRead(tableName: String, isMetadataEnabledOnRead: Boolean): 
sql.DataFrame = {
+    spark.sql("set hoodie.datasource.query.type=\"snapshot\"")
+    spark.sql(s"set 
hoodie.metadata.enable=${String.valueOf(isMetadataEnabledOnRead)}")
+    spark.sql(s"select * from $tableName")
+  }
+
+  def doInlineCompact(tableName: String, recDf: sql.DataFrame, writeOp: 
String, isMetadataEnabledOnWrite: Boolean, numDeltaCommits: String, 
keyGenClass: String): Unit = {
+    spark.sql("set hoodie.compact.inline=true")
+    spark.sql(s"set hoodie.compact.inline.max.delta.commits=$numDeltaCommits")
+    insertInto(tableName, recDf, writeOp, isMetadataEnabledOnWrite, 
keyGenClass)
+    spark.sql("set hoodie.compact.inline=false")
+  }
+
+  def getWriteOptions(tableName: String, tableType: String, 
isMetadataEnabledOnWrite: Boolean,
+                      keyGenClass: String, indexType: String): String = {
+    val typeString = if (tableType.equals("COPY_ON_WRITE")) {
+      "cow"
+    } else if (tableType.equals("MERGE_ON_READ")) {
+      "mor"
+    } else {
+      tableType
+    }
+
+    s"""
+       |tblproperties (
+       |  type = '$typeString',
+       |  primaryKey = '_row_key',
+       |  preCombineField = 'timestamp',
+       |  hoodie.bulkinsert.shuffle.parallelism = 4,
+       |  hoodie.database.name = "databaseName",
+       |  hoodie.table.keygenerator.class = '$keyGenClass',
+       |  hoodie.delete.shuffle.parallelism = 2,
+       |  hoodie.index.type = "$indexType",
+       |  hoodie.insert.shuffle.parallelism = 4,
+       |  hoodie.metadata.enable = ${String.valueOf(isMetadataEnabledOnWrite)},
+       |  hoodie.table.name = "$tableName",
+       |  hoodie.upsert.shuffle.parallelism = 4
+       | )""".stripMargin
+  }
+
+  def insertInto(tableName: String, inputDf: sql.DataFrame, writeOp: String, 
isMetadataEnabledOnWrite: Boolean, keyGenClass: String): Unit = {
+    inputDf.select("timestamp", "_row_key", "rider", "driver", "begin_lat", 
"begin_lon", "end_lat", "end_lon", "fare",
+      "_hoodie_is_deleted", 
"partition_path").createOrReplaceTempView("insert_temp_table")
+    spark.sql(s"set 
hoodie.metadata.enable=${String.valueOf(isMetadataEnabledOnWrite)}")
+    spark.sql(s"set hoodie.datasource.write.keygenerator.class=$keyGenClass")
+    if (writeOp.equals("upsert")) {
+      spark.sql(s"set hoodie.datasource.write.operation=$writeOp")
+      spark.sql("set hoodie.sql.bulk.insert.enable=false")
+      spark.sql("set hoodie.sql.insert.mode=upsert")
+      spark.sql(
+        s"""
+           | merge into $tableName as target
+           | using insert_temp_table as source
+           | on target._row_key = source._row_key and
+           | target.partition_path = source.partition_path
+           | when matched then update set *
+           | when not matched then insert *
+           | """.stripMargin)
+    } else if (writeOp.equals("bulk_insert")) {
+      //If HUDI-5257 is resolved, write operation should be bulk_insert, and 
this function can be more compact due to
+      //less repeated code
+      spark.sql("set hoodie.datasource.write.operation=insert")
+      spark.sql("set hoodie.sql.bulk.insert.enable=true")
+      spark.sql("set hoodie.sql.insert.mode=non-strict")
+      spark.sql(s"insert into $tableName select * from insert_temp_table")
+    } else if (writeOp.equals("insert")) {
+      spark.sql(s"set hoodie.datasource.write.operation=$writeOp")
+      spark.sql("set hoodie.sql.bulk.insert.enable=false")
+      spark.sql("set hoodie.sql.insert.mode=non-strict")
+      spark.sql(s"insert into $tableName select * from insert_temp_table")
+    }
+  }
+  def createTable(tableName: String, keyGenClass: String, writeOptions: 
String, tableBasePath: String): Unit = {
+    //If you have partitioned by (partition_path) with nonpartitioned keygen, 
the partition_path will be empty in the table
+    val partitionedBy = if 
(!keyGenClass.equals("org.apache.hudi.keygen.NonpartitionedKeyGenerator")) {
+      "partitioned by (partition_path)"
+    } else {
+      ""
+    }
+
+    spark.sql(
+      s"""
+         | create table $tableName (
+         |  timestamp long,
+         |  _row_key string,
+         |  rider string,
+         |  driver string,
+         |  begin_lat double,
+         |  begin_lon double,
+         |  end_lat double,
+         |  end_lon double,
+         |  fare STRUCT<
+         |    amount: double,
+         |    currency: string >,
+         |  _hoodie_is_deleted boolean,
+         |  partition_path string
+         |) using hudi
+         | $partitionedBy
+         | $writeOptions
+         | location '$tableBasePath'
+         |
+    """.stripMargin)
+  }
+
+  def generateInserts(dataGen: HoodieTestDataGenerator, instantTime: String, 
n: Int): sql.DataFrame = {
+    val recs = dataGen.generateInsertsNestedExample(instantTime, n)
+    spark.read.json(spark.sparkContext.parallelize(recordsToStrings(recs), 2))
+  }
+
+  def generateUniqueUpdates(dataGen: HoodieTestDataGenerator, instantTime: 
String, n: Int): sql.DataFrame = {
+    val recs = dataGen.generateUniqueUpdatesNestedExample(instantTime, n)
+    spark.read.json(spark.sparkContext.parallelize(recordsToStrings(recs), 2))
+  }
+
+  def compareUpdateDfWithHudiDf(inputDf: Dataset[Row], hudiDf: Dataset[Row], 
beforeDf: Dataset[Row]): Unit = {
+    dropMetaColumns(hudiDf).createOrReplaceTempView("hudiTbl")
+    inputDf.createOrReplaceTempView("inputTbl")
+    beforeDf.createOrReplaceTempView("beforeTbl")
+    val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from hudiTbl")
+    val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from inputTbl")
+    val beforeDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from beforeTbl")
+
+    assertEquals(hudiDfToCompare.intersect(inputDfToCompare).count, 
inputDfToCompare.count)
+    
assertEquals(hudiDfToCompare.except(inputDfToCompare).except(beforeDfToCompare).count,
 0)
+  }
+
+  def compareEntireInputDfWithHudiDf(inputDf: Dataset[Row], hudiDf: 
Dataset[Row]): Unit = {
+    dropMetaColumns(hudiDf).createOrReplaceTempView("hudiTbl")
+    inputDf.createOrReplaceTempView("inputTbl")
+    val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from hudiTbl")
+    val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from inputTbl")
+
+    assertEquals(hudiDfToCompare.intersect(inputDfToCompare).count, 
inputDfToCompare.count)
+    assertEquals(hudiDfToCompare.except(inputDfToCompare).count, 0)
+  }
+
+  def doMORReadOptimizedQuery(isMetadataEnabledOnRead: Boolean, basePath: 
String): sql.DataFrame = {
+    spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+      .option(HoodieMetadataConfig.ENABLE.key(), isMetadataEnabledOnRead)
+      .load(basePath)
+  }
+
+  def compareROAndRT(isMetadataEnabledOnRead: Boolean, tableName: String, 
basePath: String): Unit = {
+    val roDf = doMORReadOptimizedQuery(isMetadataEnabledOnRead, basePath)
+    val rtDf = doSnapshotRead(tableName, isMetadataEnabledOnRead)
+    dropMetaColumns(roDf).createOrReplaceTempView("hudiTbl1")
+    dropMetaColumns(rtDf).createOrReplaceTempView("hudiTbl2")
+
+    val hudiDf1ToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from hudiTbl1")
+    val hudiDf2ToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from hudiTbl2")
+
+    assertEquals(hudiDf1ToCompare.intersect(hudiDf2ToCompare).count, 
hudiDf1ToCompare.count)
+    assertEquals(hudiDf1ToCompare.except(hudiDf2ToCompare).count, 0)
+  }
+
+  def dropMetaColumns(inputDf: sql.DataFrame): sql.DataFrame = {
+    inputDf.drop(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD,
+      HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+      HoodieRecord.FILENAME_METADATA_FIELD)
+  }
+
+  //params for immutable user flow
+  val paramsForImmutable: List[String] = List(
+    
"COPY_ON_WRITE|insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+    
"COPY_ON_WRITE|insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+    
"COPY_ON_WRITE|insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+    
"COPY_ON_WRITE|insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+    
"COPY_ON_WRITE|insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+    
"COPY_ON_WRITE|insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+    
"COPY_ON_WRITE|insert|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+    
"COPY_ON_WRITE|insert|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+    
"COPY_ON_WRITE|insert|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+    
"MERGE_ON_READ|insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+    
"MERGE_ON_READ|insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+    
"MERGE_ON_READ|insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+    
"MERGE_ON_READ|insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+    
"MERGE_ON_READ|insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+    
"MERGE_ON_READ|insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+    
"MERGE_ON_READ|insert|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+    
"MERGE_ON_READ|insert|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+    
"MERGE_ON_READ|insert|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+    
"COPY_ON_WRITE|bulk_insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+    
"COPY_ON_WRITE|bulk_insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+    
"COPY_ON_WRITE|bulk_insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+    
"COPY_ON_WRITE|bulk_insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+    
"COPY_ON_WRITE|bulk_insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+    
"COPY_ON_WRITE|bulk_insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+    
"COPY_ON_WRITE|bulk_insert|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+    
"COPY_ON_WRITE|bulk_insert|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+    
"COPY_ON_WRITE|bulk_insert|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+    
"MERGE_ON_READ|bulk_insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+    
"MERGE_ON_READ|bulk_insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+    
"MERGE_ON_READ|bulk_insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+    
"MERGE_ON_READ|bulk_insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+    
"MERGE_ON_READ|bulk_insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+    
"MERGE_ON_READ|bulk_insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+    
"MERGE_ON_READ|bulk_insert|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+    
"MERGE_ON_READ|bulk_insert|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+    
"MERGE_ON_READ|bulk_insert|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM"
+  )
+
+  //extracts the params and runs each immutable user flow test
+  forAll(paramsForImmutable) { (paramStr: String) =>
+    test(s"Immutable user flow with params: $paramStr") {
+      val splits = paramStr.split('|')
+      withTempDir { basePath =>
+        testImmutableUserFlow(basePath,
+          tableType = splits(0),
+          operation = splits(1),
+          isMetadataEnabledOnWrite = splits(2).toBoolean,
+          isMetadataEnabledOnRead = splits(3).toBoolean,
+          keyGenClass = splits(4),
+          indexType = splits(5))
+      }
+    }
+  }
+
+  def testImmutableUserFlow(basePath: File, tableType: String, operation: 
String, isMetadataEnabledOnWrite: Boolean,
+                                isMetadataEnabledOnRead: Boolean, keyGenClass: 
String, indexType: String): Unit = {
+    val tableName = generateTableName
+    val tableBasePath = basePath.getCanonicalPath + "/" + tableName
+    val writeOptions = getWriteOptions(tableName, tableType, 
isMetadataEnabledOnWrite, keyGenClass, indexType)
+    createTable(tableName, keyGenClass, writeOptions, tableBasePath)
+    val fs = FSUtils.getFs(tableBasePath, 
spark.sparkContext.hadoopConfiguration)
+
+    //Insert Operation
+    val dataGen = new 
HoodieTestDataGenerator(HoodieTestDataGenerator.TRIP_NESTED_EXAMPLE_SCHEMA, 
0xDEED)
+    val inputDf0 = generateInserts(dataGen, "000", 100)
+    insertInto(tableName, inputDf0, "bulk_insert", isMetadataEnabledOnWrite, 
keyGenClass)
+
+    assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, tableBasePath, "000"))
+
+    //Snapshot query
+    val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabledOnRead)
+    snapshotDf1.cache()
+    assertEquals(100, snapshotDf1.count())
+    compareEntireInputDfWithHudiDf(inputDf0, snapshotDf1)
+
+    val inputDf1 = generateInserts(dataGen, "001", 50)
+    insertInto(tableName, inputDf1, operation, isMetadataEnabledOnWrite, 
keyGenClass)
+
+    val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabledOnRead)
+    snapshotDf2.cache()
+    assertEquals(150, snapshotDf2.count())
+
+    compareEntireInputDfWithHudiDf(inputDf1.union(inputDf0), snapshotDf2)
+
+    val inputDf2 = generateInserts(dataGen, "002", 60)
+    inputDf2.cache()
+    insertInto(tableName, inputDf2, operation, isMetadataEnabledOnWrite, 
keyGenClass)
+
+    assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, 
tableBasePath, "000").size())
+
+    // Snapshot Query
+    val snapshotDf3 = doSnapshotRead(tableName, isMetadataEnabledOnRead)
+    snapshotDf3.cache()
+    assertEquals(210, snapshotDf3.count())
+    compareEntireInputDfWithHudiDf(inputDf1.union(inputDf0).union(inputDf2), 
snapshotDf3)
+  }
+
+}


Reply via email to