This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ff0e3f5 [HUDI-1612] Fix write test flakiness in StreamWriteITCase
(#2567)
ff0e3f5 is described below
commit ff0e3f5669f1a70cbe9d302e8b1e6a5dbc801b25
Author: lamber-ken <[email protected]>
AuthorDate: Thu Feb 11 23:37:19 2021 +0800
[HUDI-1612] Fix write test flakiness in StreamWriteITCase (#2567)
* [HUDI-1612] Fix write test flakiness in StreamWriteITCase
---
.../apache/hudi/operator/StreamWriteITCase.java | 16 ++++---
.../org/apache/hudi/operator/utils/TestData.java | 56 +++++++++++++++++++++-
2 files changed, 63 insertions(+), 9 deletions(-)
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
index f745a3c..be8ec36 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
@@ -54,6 +54,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -65,13 +66,13 @@ import java.util.concurrent.TimeUnit;
*/
public class StreamWriteITCase extends TestLogger {
- private static final Map<String, String> EXPECTED = new HashMap<>();
+ private static final Map<String, List<String>> EXPECTED = new HashMap<>();
static {
- EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1,
id2,par1,id2,Stephen,33,2000,par1]");
- EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2,
id4,par2,id4,Fabian,31,4000,par2]");
- EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3,
id6,par3,id6,Emma,20,6000,par3]");
- EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4,
id8,par4,id8,Han,56,8000,par4]");
+ EXPECTED.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1",
"id2,par1,id2,Stephen,33,2000,par1"));
+ EXPECTED.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2",
"id4,par2,id4,Fabian,31,4000,par2"));
+ EXPECTED.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3",
"id6,par3,id6,Emma,20,6000,par3"));
+ EXPECTED.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4",
"id8,par4,id8,Han,56,8000,par4"));
}
@TempDir
@@ -85,6 +86,7 @@ public class StreamWriteITCase extends TestLogger {
execEnv.setParallelism(4);
// set up checkpoint interval
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
+ execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// Read from file source
RowType rowType =
@@ -137,7 +139,7 @@ public class StreamWriteITCase extends TestLogger {
}
}
- TestData.checkWrittenData(tempFile, EXPECTED);
+ TestData.checkWrittenFullData(tempFile, EXPECTED);
}
@Test
@@ -215,6 +217,6 @@ public class StreamWriteITCase extends TestLogger {
}
}
- TestData.checkWrittenData(tempFile, EXPECTED);
+ TestData.checkWrittenFullData(tempFile, EXPECTED);
}
}
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
index b4c24ef..5b4131c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
@@ -18,7 +18,13 @@
package org.apache.hudi.operator.utils;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.table.data.RowData;
@@ -49,6 +55,7 @@ import static junit.framework.TestCase.assertEquals;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/** Data set for testing, also some utilities to check the results. */
public class TestData {
@@ -105,7 +112,7 @@ public class TestData {
*
* <p>Note: Replace it with the Flink reader when it is supported.
*
- * @param baseFile The file base to check, should be a directly
+ * @param baseFile The file base to check, should be a directory
* @param expected The expected results mapping, the key should be the
partition path
*/
public static void checkWrittenData(File baseFile, Map<String, String>
expected) throws IOException {
@@ -117,7 +124,7 @@ public class TestData {
*
* <p>Note: Replace it with the Flink reader when it is supported.
*
- * @param baseFile The file base to check, should be a directly
+ * @param baseFile The file base to check, should be a directory
* @param expected The expected results mapping, the key should be the
partition path
* @param partitions The expected partition number
*/
@@ -150,6 +157,51 @@ public class TestData {
}
/**
+ * Checks the source data are written as expected.
+ *
+ * <p>Note: Replace it with the Flink reader when it is supported.
+ *
+ * @param basePath The file base to check, should be a directory
+ * @param expected The expected results mapping, the key should be the
partition path
+ */
+ public static void checkWrittenFullData(
+ File basePath,
+ Map<String, List<String>> expected) throws IOException {
+
+ // 1. init flink table
+ HoodieTableMetaClient metaClient =
HoodieTestUtils.init(basePath.getAbsolutePath());
+ HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath.getAbsolutePath()).build();
+ FlinkTaskContextSupplier supplier = new FlinkTaskContextSupplier(null);
+ HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(supplier);
+ HoodieFlinkTable table = HoodieFlinkTable.create(config, context,
metaClient);
+
+ // 2. check each partition data
+ expected.forEach((partition, partitionDataSet) -> {
+
+ List<String> readBuffer = new ArrayList<>();
+
+ table.getFileSystemView().getAllFileGroups(partition)
+ .forEach(v -> v.getLatestDataFile().ifPresent(baseFile -> {
+ String path = baseFile.getPath();
+ try {
+ ParquetReader<GenericRecord> reader =
AvroParquetReader.<GenericRecord>builder(new Path(path)).build();
+ GenericRecord nextRecord = reader.read();
+ while (nextRecord != null) {
+ readBuffer.add(filterOutVariables(nextRecord));
+ nextRecord = reader.read();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+
+ assertTrue(partitionDataSet.size() == readBuffer.size() &&
partitionDataSet.containsAll(readBuffer));
+
+ });
+
+ }
+
+ /**
* Filter out the variables like file name.
*/
private static String filterOutVariables(GenericRecord genericRecord) {