This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ff0e3f5  [HUDI-1612] Fix write test flakiness in StreamWriteITCase 
(#2567)
ff0e3f5 is described below

commit ff0e3f5669f1a70cbe9d302e8b1e6a5dbc801b25
Author: lamber-ken <[email protected]>
AuthorDate: Thu Feb 11 23:37:19 2021 +0800

    [HUDI-1612] Fix write test flakiness in StreamWriteITCase (#2567)
    
    * [HUDI-1612] Fix write test flakiness in StreamWriteITCase
---
 .../apache/hudi/operator/StreamWriteITCase.java    | 16 ++++---
 .../org/apache/hudi/operator/utils/TestData.java   | 56 +++++++++++++++++++++-
 2 files changed, 63 insertions(+), 9 deletions(-)

diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java 
b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
index f745a3c..be8ec36 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
@@ -54,6 +54,7 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -65,13 +66,13 @@ import java.util.concurrent.TimeUnit;
  */
 public class StreamWriteITCase extends TestLogger {
 
-  private static final Map<String, String> EXPECTED = new HashMap<>();
+  private static final Map<String, List<String>> EXPECTED = new HashMap<>();
 
   static {
-    EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1, 
id2,par1,id2,Stephen,33,2000,par1]");
-    EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2, 
id4,par2,id4,Fabian,31,4000,par2]");
-    EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3, 
id6,par3,id6,Emma,20,6000,par3]");
-    EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4, 
id8,par4,id8,Han,56,8000,par4]");
+    EXPECTED.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", 
"id2,par1,id2,Stephen,33,2000,par1"));
+    EXPECTED.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", 
"id4,par2,id4,Fabian,31,4000,par2"));
+    EXPECTED.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", 
"id6,par3,id6,Emma,20,6000,par3"));
+    EXPECTED.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", 
"id8,par4,id8,Han,56,8000,par4"));
   }
 
   @TempDir
@@ -85,6 +86,7 @@ public class StreamWriteITCase extends TestLogger {
     execEnv.setParallelism(4);
     // set up checkpoint interval
     execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
+    execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
 
     // Read from file source
     RowType rowType =
@@ -137,7 +139,7 @@ public class StreamWriteITCase extends TestLogger {
       }
     }
 
-    TestData.checkWrittenData(tempFile, EXPECTED);
+    TestData.checkWrittenFullData(tempFile, EXPECTED);
   }
 
   @Test
@@ -215,6 +217,6 @@ public class StreamWriteITCase extends TestLogger {
       }
     }
 
-    TestData.checkWrittenData(tempFile, EXPECTED);
+    TestData.checkWrittenFullData(tempFile, EXPECTED);
   }
 }
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java 
b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
index b4c24ef..5b4131c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
@@ -18,7 +18,13 @@
 
 package org.apache.hudi.operator.utils;
 
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieFlinkTable;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.flink.table.data.RowData;
@@ -49,6 +55,7 @@ import static junit.framework.TestCase.assertEquals;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Data set for testing, also some utilities to check the results. */
 public class TestData {
@@ -105,7 +112,7 @@ public class TestData {
    *
    * <p>Note: Replace it with the Flink reader when it is supported.
    *
-   * @param baseFile The file base to check, should be a directly
+   * @param baseFile The file base to check, should be a directory
    * @param expected The expected results mapping, the key should be the 
partition path
    */
   public static void checkWrittenData(File baseFile, Map<String, String> 
expected) throws IOException {
@@ -117,7 +124,7 @@ public class TestData {
    *
    * <p>Note: Replace it with the Flink reader when it is supported.
    *
-   * @param baseFile   The file base to check, should be a directly
+   * @param baseFile   The file base to check, should be a directory
    * @param expected   The expected results mapping, the key should be the 
partition path
    * @param partitions The expected partition number
    */
@@ -150,6 +157,51 @@ public class TestData {
   }
 
   /**
+   * Checks the source data are written as expected.
+   *
+   * <p>Note: Replace it with the Flink reader when it is supported.
+   *
+   * @param basePath   The file base to check, should be a directory
+   * @param expected   The expected results mapping, the key should be the 
partition path
+   */
+  public static void checkWrittenFullData(
+      File basePath,
+      Map<String, List<String>> expected) throws IOException {
+
+    // 1. init flink table
+    HoodieTableMetaClient metaClient = 
HoodieTestUtils.init(basePath.getAbsolutePath());
+    HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(basePath.getAbsolutePath()).build();
+    FlinkTaskContextSupplier supplier = new FlinkTaskContextSupplier(null);
+    HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(supplier);
+    HoodieFlinkTable table = HoodieFlinkTable.create(config, context, 
metaClient);
+
+    // 2. check each partition data
+    expected.forEach((partition, partitionDataSet) -> {
+
+      List<String> readBuffer = new ArrayList<>();
+
+      table.getFileSystemView().getAllFileGroups(partition)
+          .forEach(v -> v.getLatestDataFile().ifPresent(baseFile -> {
+            String path = baseFile.getPath();
+            try {
+              ParquetReader<GenericRecord> reader = 
AvroParquetReader.<GenericRecord>builder(new Path(path)).build();
+              GenericRecord nextRecord = reader.read();
+              while (nextRecord != null) {
+                readBuffer.add(filterOutVariables(nextRecord));
+                nextRecord = reader.read();
+              }
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }));
+
+      assertTrue(partitionDataSet.size() == readBuffer.size() && 
partitionDataSet.containsAll(readBuffer));
+
+    });
+
+  }
+
+  /**
    * Filter out the variables like file name.
    */
   private static String filterOutVariables(GenericRecord genericRecord) {

Reply via email to