yuzelin commented on code in PR #1010:
URL: https://github.com/apache/incubator-paimon/pull/1010#discussion_r1179900363


##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/StreamingWarehouseITCase.java:
##########
@@ -214,6 +228,157 @@ public void testUserStory() throws Exception {
         dailyTaskHandler.cancel().get();
     }
 
+    @ParameterizedTest
+    @EnumSource(MergeEngine.class)
+    public void testUpdateWithPrimaryKey(MergeEngine mergeEngine) throws 
Exception {
+        init(createAndRegisterTempFile("").toString(), 1);
+
+        // Step1: define cleaned trade order table schema
+        Map<String, String> options = new HashMap<>();
+        options.put(WRITE_MODE.key(), WriteMode.CHANGE_LOG.toString());
+        options.put(MERGE_ENGINE.key(), mergeEngine.toString());
+        createTestTable(true, options);
+
+        // Step2: batch write some historical data
+        insertTestData();
+
+        // Step3: prepare expected data.
+        final Set<CleanedTradeOrder> expectedData =
+                new HashSet<CleanedTradeOrder>(4) {
+                    {
+                        add(
+                                parseData(
+                                        "0, 2023-04-23 14:23:25, buyer_id_0, 
1, 1, 1, true, 1, 2022-04-23"));
+                        add(
+                                parseData(
+                                        "1, 2023-04-23 14:23:26, _ANONYMOUS_, 
2, 2, 2, true, 2.0, 2022-04-23"));
+                        add(
+                                parseData(
+                                        "2, 2023-04-23 14:23:27, buyer_id_2, 
3, 3, 3, true, 1, 2022-04-23"));
+                        add(
+                                parseData(
+                                        "3, 2023-04-24 14:23:28, 404NotFound, 
4, 4, 4, true, -1, 2022-04-24"));
+                    }
+                };
+
+        // Step4: update some data
+        try {
+            updateTestData();
+            assertThat(mergeEngine).isIn(MergeEngine.DEDUPLICATE, 
MergeEngine.PARTIAL_UPDATE);
+
+            // Step5: verify result
+            String querySql = "SELECT * FROM cleaned_trade_order";
+            try (BlockingIterator<Row, CleanedTradeOrder> batchIter =
+                    BlockingIterator.of(bEnv.executeSql(querySql).collect(), 
ORDER_CONVERTER)) {
+                batchIter
+                        .collect()
+                        .forEach(
+                                data -> {
+                                    
assertThat(expectedData.remove(data)).isTrue();
+                                });
+                assertThat(expectedData).isEmpty();
+            }
+        } catch (UnsupportedOperationException e) {
+            assertThat(mergeEngine).isNotIn(MergeEngine.DEDUPLICATE, 
MergeEngine.PARTIAL_UPDATE);
+        }
+    }

Review Comment:
   I recommend to use `AssertionUtils` instead of try-catch.
   If the merge engine is expected to be unsupported, test:
   ```
   assertThatThrownBy( () -> ... )
     .satisfies(
       AssertionUtils.anyCauseMatches(xxx.class, "...")
   )  
   ```
   The same in `testUpdateWithoutPrimaryKey`.



##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/StreamingWarehouseITCase.java:
##########
@@ -214,6 +228,157 @@ public void testUserStory() throws Exception {
         dailyTaskHandler.cancel().get();
     }
 
+    @ParameterizedTest
+    @EnumSource(MergeEngine.class)
+    public void testUpdateWithPrimaryKey(MergeEngine mergeEngine) throws 
Exception {

Review Comment:
   I prefer to place this test in `ReadWriteTableITCase` for 2 reason:
   1. Many SQL-related tests are there. You can put tests in `Keyword` field or 
a new field like `Update statement`. 
   2. This test class seems a dedicate class for a whole pipeline.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to