cshuo commented on code in PR #13027:
URL: https://github.com/apache/hudi/pull/13027#discussion_r2011402771


##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java:
##########
@@ -167,6 +161,8 @@ public void testWriteMergeOnReadWithCompaction(String 
indexType) throws Exceptio
     conf.setString(FlinkOptions.INDEX_TYPE, indexType);
     conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
     conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+    // use synchronized compaction to ensure flink job finishing with 
compaction completed.
+    conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);

Review Comment:
   Writing pipeline in this test is customized, not same with that in 
`HoodieTableSink`. Compact operator is added if the table type is MOR, see Line 
285.



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java:
##########
@@ -541,24 +540,26 @@ void testReadWithPartitionStatsPruning(HoodieTableType 
tableType, boolean hiveSt
             "select * from t1 where uuid > 'id5' and age > 15",
             // filter by partition stats pruner and dynamic partition pruner
             "select * from t1 where uuid > 'id5' and age > 15 and `partition` 
> 'par3'");
-    List<String> expectResults =
+    List<List<Row>> expectResults =
         Arrays.asList(
-            "[+I[id1, Danny, 23, 1970-01-01T00:00:00.001, par1], "
-                + "+I[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1], "
-                + "+I[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], "
-                + "+I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2], "
-                + "+I[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], "
-                + "+I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], "
-                + "+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], "
-                + "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]",
-            "[+I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], "
-                + "+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], "
-                + "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]",
-            "[+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], "
-                + "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]");
+            Arrays.asList(
+                row("id1", "Danny", 23, 
TimestampData.fromEpochMillis(1).toLocalDateTime(), "par1"),
+                row("id2", "Stephen", 33, 
TimestampData.fromEpochMillis(2).toLocalDateTime(), "par1"),
+                row("id3", "Julian", 53, 
TimestampData.fromEpochMillis(3).toLocalDateTime(), "par2"),
+                row("id4", "Fabian", 31, 
TimestampData.fromEpochMillis(4).toLocalDateTime(), "par2"),
+                row("id5", "Sophia", 18, 
TimestampData.fromEpochMillis(5).toLocalDateTime(), "par3"),

Review Comment:
   The changes is introduced for two reasons:
   1) reuse testing utils for expected result of type `List<Row>`
   2) String representation for results are not suitable for some data field, 
for example, MapType, because the order is not stable for Map entry. We can 
utilize utils from Flink `RowUtils#compareRows` to compare two `List<Row>`.
   
   For the readability, as the inserting source data is also in form of 
List<RowData>, maybe it's more straightforward to compare List<Row> and 
List<RowData>.



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java:
##########
@@ -186,11 +182,12 @@ public void testWriteCopyOnWriteWithClustering(boolean 
sortClusteringEnabled) th
     writeWithClusterAndCheckExpected(conf, "cow_write_with_cluster", 1, 
EXPECTED);
   }
 
-  @Disabled("HUDI-9196")
   @ParameterizedTest
   @ValueSource(strings = {"COPY_ON_WRITE", "MERGE_ON_READ"})
   public void testStreamWriteWithIndexBootstrap(String tableType) throws 
Exception {
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.toURI().toString());
+    conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+    conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);

Review Comment:
   Writing pipeline in this test is customized, not same with that in 
`HoodieTableSink`. Compact operator is added if the table type is MOR, see Line 
285.



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java:
##########
@@ -1129,9 +1113,9 @@ void testWriteLocalIndex() {
     assertRowsEquals(result, expected, 3);
   }
 
-  @Test
-  void testStreamReadEmptyTablePath() throws Exception {
-    // case1: table metadata path does not exists
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testStreamReadEmptyTablePath(boolean initTable) throws Exception {

Review Comment:
   For the previous code, sometimes tm resources are not released after case1 
canceled, which lead to unexpected failure of case2. The change here is to make 
sure a new table environment is created for testing every case, and enhance the 
stability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to