This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b8f4a30  Fix Integration test flakiness in HoodieJavaStreamingApp 
(#1967)
b8f4a30 is described below

commit b8f4a30efd8bb6257c926a4b79a092cb96b8aff8
Author: Balaji Varadarajan <[email protected]>
AuthorDate: Fri Aug 14 01:42:15 2020 -0700

    Fix Integration test flakiness in HoodieJavaStreamingApp (#1967)
---
 .../org/apache/hudi/integ/ITTestHoodieSanity.java  | 44 +++++++++++++---------
 .../org/apache/hudi/HoodieDataSourceHelpers.java   |  2 +-
 hudi-spark/src/test/java/HoodieJavaApp.java        |  4 ++
 .../src/test/java/HoodieJavaStreamingApp.java      |  4 +-
 .../hudi/functional/TestStructuredStreaming.scala  |  4 +-
 5 files changed, 37 insertions(+), 21 deletions(-)

diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
index c7787a7..aba1d54 100644
--- 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
+++ 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.integ;
 
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 
@@ -34,20 +35,23 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  */
 public class ITTestHoodieSanity extends ITTestBase {
 
+  private static final String HDFS_BASE_URL =  "hdfs://namenode";
+  private static final String HDFS_STREAMING_SOURCE =  HDFS_BASE_URL + 
"/streaming/source/";
+  private static final String HDFS_STREAMING_CKPT =  HDFS_BASE_URL + 
"/streaming/ckpt/";
+
   enum PartitionType {
     SINGLE_KEY_PARTITIONED, MULTI_KEYS_PARTITIONED, NON_PARTITIONED,
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = { HOODIE_JAVA_APP, HOODIE_JAVA_STREAMING_APP })
+  @Test
   /**
    * A basic integration test that runs HoodieJavaApp to create a sample COW 
Hoodie with single partition key data-set
    * and performs upserts on it. Hive integration and upsert functionality is 
checked by running a count query in hive
    * console.
    */
-  public void testRunHoodieJavaAppOnSinglePartitionKeyCOWTable(String command) 
throws Exception {
-    String hiveTableName = "docker_hoodie_single_partition_key_cow_test";
-    testRunHoodieJavaApp(command, hiveTableName, 
HoodieTableType.COPY_ON_WRITE.name(),
+  public void testRunHoodieJavaAppOnSinglePartitionKeyCOWTable() throws 
Exception {
+    String hiveTableName = "docker_hoodie_single_partition_key_cow_test_" + 
HoodieActiveTimeline.createNewInstantTime();
+    testRunHoodieJavaApp(hiveTableName, HoodieTableType.COPY_ON_WRITE.name(),
         PartitionType.SINGLE_KEY_PARTITIONED);
     dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
   }
@@ -59,9 +63,9 @@ public class ITTestHoodieSanity extends ITTestBase {
    * data-set and performs upserts on it. Hive integration and upsert 
functionality is checked by running a count query
    * in hive console.
    */
-  public void testRunHoodieJavaAppOnMultiPartitionKeysCOWTable(String command) 
throws Exception {
-    String hiveTableName = "docker_hoodie_multi_partition_key_cow_test";
-    testRunHoodieJavaApp(command, hiveTableName, 
HoodieTableType.COPY_ON_WRITE.name(),
+  public void testRunHoodieJavaAppOnMultiPartitionKeysCOWTable() throws 
Exception {
+    String hiveTableName = "docker_hoodie_multi_partition_key_cow_test_" + 
HoodieActiveTimeline.createNewInstantTime();
+    testRunHoodieJavaApp(HOODIE_JAVA_APP, hiveTableName, 
HoodieTableType.COPY_ON_WRITE.name(),
         PartitionType.MULTI_KEYS_PARTITIONED);
     dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
   }
@@ -73,21 +77,20 @@ public class ITTestHoodieSanity extends ITTestBase {
    * console.
    */
   public void testRunHoodieJavaAppOnNonPartitionedCOWTable() throws Exception {
-    String hiveTableName = "docker_hoodie_non_partition_key_cow_test";
+    String hiveTableName = "docker_hoodie_non_partition_key_cow_test_" + 
HoodieActiveTimeline.createNewInstantTime();
     testRunHoodieJavaApp(hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), 
PartitionType.NON_PARTITIONED);
     dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = { HOODIE_JAVA_APP, HOODIE_JAVA_STREAMING_APP })
+  @Test
   /**
    * A basic integration test that runs HoodieJavaApp to create a sample MOR 
Hoodie with single partition key data-set
    * and performs upserts on it. Hive integration and upsert functionality is 
checked by running a count query in hive
    * console.
    */
-  public void testRunHoodieJavaAppOnSinglePartitionKeyMORTable(String command) 
throws Exception {
-    String hiveTableName = "docker_hoodie_single_partition_key_mor_test";
-    testRunHoodieJavaApp(command, hiveTableName, 
HoodieTableType.MERGE_ON_READ.name(),
+  public void testRunHoodieJavaAppOnSinglePartitionKeyMORTable() throws 
Exception {
+    String hiveTableName = "docker_hoodie_single_partition_key_mor_test_" + 
HoodieActiveTimeline.createNewInstantTime();
+    testRunHoodieJavaApp(hiveTableName, HoodieTableType.MERGE_ON_READ.name(),
         PartitionType.SINGLE_KEY_PARTITIONED);
     dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name());
   }
@@ -100,7 +103,7 @@ public class ITTestHoodieSanity extends ITTestBase {
    * in hive console.
    */
   public void testRunHoodieJavaAppOnMultiPartitionKeysMORTable(String command) 
throws Exception {
-    String hiveTableName = "docker_hoodie_multi_partition_key_mor_test";
+    String hiveTableName = "docker_hoodie_multi_partition_key_mor_test_" + 
HoodieActiveTimeline.createNewInstantTime();
     testRunHoodieJavaApp(command, hiveTableName, 
HoodieTableType.MERGE_ON_READ.name(),
         PartitionType.MULTI_KEYS_PARTITIONED);
     dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name());
@@ -113,7 +116,7 @@ public class ITTestHoodieSanity extends ITTestBase {
    * console.
    */
   public void testRunHoodieJavaAppOnNonPartitionedMORTable() throws Exception {
-    String hiveTableName = "docker_hoodie_non_partition_key_mor_test";
+    String hiveTableName = "docker_hoodie_non_partition_key_mor_test_" + 
HoodieActiveTimeline.createNewInstantTime();
     testRunHoodieJavaApp(hiveTableName, HoodieTableType.MERGE_ON_READ.name(), 
PartitionType.NON_PARTITIONED);
     dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name());
   }
@@ -127,7 +130,7 @@ public class ITTestHoodieSanity extends ITTestBase {
       throws Exception {
 
     String hdfsPath = "/" + hiveTableName;
-    String hdfsUrl = "hdfs://namenode" + hdfsPath;
+    String hdfsUrl = HDFS_BASE_URL + hdfsPath;
 
     // Drop Table if it exists
     try {
@@ -155,6 +158,13 @@ public class ITTestHoodieSanity extends ITTestBase {
       cmd = command + " --hive-sync --table-path " + hdfsUrl + " --hive-url " 
+ HIVE_SERVER_JDBC_URL
           + " --table-type " + tableType + " --hive-table " + hiveTableName + 
" --non-partitioned";
     }
+
+    if (command.equals(HOODIE_JAVA_STREAMING_APP)) {
+      String streamingSourcePath = HDFS_STREAMING_SOURCE + hiveTableName;
+      String streamingCkptPath = HDFS_STREAMING_CKPT + hiveTableName;
+      cmd = cmd + " --streaming-source-path " +  streamingSourcePath
+          + " --streaming-checkpointing-path " + streamingCkptPath;
+    }
     executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true);
 
     String snapshotTableName = 
tableType.equals(HoodieTableType.MERGE_ON_READ.name())
diff --git 
a/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java 
b/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
index fccc65d..1467cf6 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
@@ -74,7 +74,7 @@ public class HoodieDataSourceHelpers {
     if (metaClient.getTableType().equals(HoodieTableType.MERGE_ON_READ)) {
       return metaClient.getActiveTimeline().getTimelineOfActions(
           CollectionUtils.createSet(HoodieActiveTimeline.COMMIT_ACTION,
-              HoodieActiveTimeline.DELTA_COMMIT_ACTION));
+              
HoodieActiveTimeline.DELTA_COMMIT_ACTION)).filterCompletedInstants();
     } else {
       return metaClient.getCommitTimeline().filterCompletedInstants();
     }
diff --git a/hudi-spark/src/test/java/HoodieJavaApp.java 
b/hudi-spark/src/test/java/HoodieJavaApp.java
index 594d813..9c42232 100644
--- a/hudi-spark/src/test/java/HoodieJavaApp.java
+++ b/hudi-spark/src/test/java/HoodieJavaApp.java
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.DataSourceReadOptions;
 import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.HoodieDataSourceHelpers;
@@ -120,6 +121,9 @@ public class HoodieJavaApp {
       dataGen = new HoodieTestDataGenerator();
     }
 
+    // Explicitly clear up the hoodie table path if it exists.
+    fs.delete(new Path(tablePath), true);
+
     /**
      * Commit with only inserts
      */
diff --git a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java 
b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
index e93784e..3b35ce9 100644
--- a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
+++ b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
@@ -273,7 +273,9 @@ public class HoodieJavaStreamingApp {
   public int addInputAndValidateIngestion(SparkSession spark, FileSystem fs, 
String srcPath,
       int initialCommits, int expRecords,
       Dataset<Row> inputDF1, Dataset<Row> inputDF2, boolean 
instantTimeValidation) throws Exception {
-    inputDF1.write().mode(SaveMode.Append).json(srcPath);
+    // Ensure, we always write only one file. This is very important to ensure 
a single batch is reliably read
+    // atomically by one iteration of spark streaming.
+    inputDF1.coalesce(1).write().mode(SaveMode.Append).json(srcPath);
 
     int numExpCommits = initialCommits + 1;
     // wait for spark streaming to process one microbatch
diff --git 
a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
 
b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
index 011573b..226cf53 100644
--- 
a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
+++ 
b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -102,7 +102,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
     }
 
     val f2 = Future {
-      inputDF1.write.mode(SaveMode.Append).json(sourcePath)
+      inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
       // wait for spark streaming to process one microbatch
       val currNumCommits = waitTillAtleastNCommits(fs, destPath, 1, 120, 5)
       assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, destPath, "000"))
@@ -112,7 +112,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
         .load(destPath + "/*/*/*/*")
       assert(hoodieROViewDF1.count() == 100)
 
-      inputDF2.write.mode(SaveMode.Append).json(sourcePath)
+      inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
       // wait for spark streaming to process one microbatch
       waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5)
       val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, 
destPath)

Reply via email to