yuxiqian commented on code in PR #4444:
URL: https://github.com/apache/flink-cdc/pull/4444#discussion_r3475539283


##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java:
##########
@@ -170,50 +171,30 @@ void testSyncWholeDatabase() throws Exception {
             waitUntilSpecificEvent(
                     "DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[], 
after=[105, hammer, 14oz carpenters hammer, 0.875], op=INSERT, meta=()}");
 
-            waitUntilSpecificEvent(
-                    "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS, 
schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS` 
VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}");
-            waitUntilSpecificEvent(
-                    "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], 
after=[171798691841, user_1, Shanghai, 123567891234], op=INSERT, meta=()}");
-            waitUntilSpecificEvent(
-                    "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], 
after=[171798691841, user_4, Shanghai, 123567891234], op=INSERT, meta=()}");
-            waitUntilSpecificEvent(
-                    "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], 
after=[171798691841, user_3, Shanghai, 123567891234], op=INSERT, meta=()}");
-            waitUntilSpecificEvent(
-                    "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], 
after=[171798691841, user_2, Shanghai, 123567891234], op=INSERT, meta=()}");
-            waitUntilSpecificEvent(
-                    "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], 
after=[171798691842, user_13, Shanghai, 123567891234], op=INSERT, meta=()}");
-            waitUntilSpecificEvent(
-                    "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], 
after=[171798691842, user_14, Shanghai, 123567891234], op=INSERT, meta=()}");
-            waitUntilSpecificEvent(
-                    "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], 
after=[171798691842, user_11, Shanghai, 123567891234], op=INSERT, meta=()}");
-            waitUntilSpecificEvent(
-                    "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], 
after=[171798691842, user_12, Shanghai, 123567891234], op=INSERT, meta=()}");
-            waitUntilSpecificEvent(
-                    "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], 
after=[171798691842, user_21, Shanghai, 123567891234], op=INSERT, meta=()}");
-            waitUntilSpecificEvent(
-                    "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], 
after=[171798691842, user_10, Shanghai, 123567891234], op=INSERT, meta=()}");
-            waitUntilSpecificEvent(
-                    "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], 
after=[171798691841, user_6, Shanghai, 123567891234], op=INSERT, meta=()}");
-            waitUntilSpecificEvent(
-                    "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], 
after=[171798691841, user_5, Shanghai, 123567891234], op=INSERT, meta=()}");
-            waitUntilSpecificEvent(
-                    "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], 
after=[171798691841, user_9, Shanghai, 123567891234], op=INSERT, meta=()}");
-            waitUntilSpecificEvent(
-                    "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], 
after=[171798691842, user_19, Shanghai, 123567891234], op=INSERT, meta=()}");
-            waitUntilSpecificEvent(
-                    "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], 
after=[171798691842, user_20, Shanghai, 123567891234], op=INSERT, meta=()}");
-            waitUntilSpecificEvent(
-                    "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], 
after=[171798691841, user_8, Shanghai, 123567891234], op=INSERT, meta=()}");
-            waitUntilSpecificEvent(
-                    "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], 
after=[171798691842, user_17, Shanghai, 123567891234], op=INSERT, meta=()}");
-            waitUntilSpecificEvent(
-                    "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], 
after=[171798691842, user_18, Shanghai, 123567891234], op=INSERT, meta=()}");
-            waitUntilSpecificEvent(
-                    "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], 
after=[171798691842, user_15, Shanghai, 123567891234], op=INSERT, meta=()}");
-            waitUntilSpecificEvent(
-                    "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], 
after=[171798691841, user_7, Shanghai, 123567891234], op=INSERT, meta=()}");
-            waitUntilSpecificEvent(
-                    "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], 
after=[171798691842, user_16, Shanghai, 123567891234], op=INSERT, meta=()}");
+            waitUntilAnySpecificEvent(
+                    "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS, 
schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS` 
VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}",
+                    "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS, 
schema=columns={`ID` DECIMAL(38, 0) NOT NULL,`NAME` VARCHAR(255) NOT 
NULL,`ADDRESS` VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, 
options=()}");
+            waitUntilCustomerInsert("DEBEZIUM.CUSTOMERS", 101, "user_1");

Review Comment:
   Write these assertions in order?



##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java:
##########
@@ -339,6 +344,7 @@ void testAssortedSchemaTransform(boolean batchMode) throws 
Exception {
     void testWildcardSchemaTransform(boolean batchMode) throws Exception {
         String startupMode = batchMode ? "snapshot" : "initial";
         String runtimeMode = batchMode ? "BATCH" : "STREAMING";
+        int testParallelism = 1;

Review Comment:
   Why this case doesn't work in multiple parallelism mode?



##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java:
##########
@@ -224,56 +205,84 @@ void testSyncWholeDatabase() throws Exception {
                     "DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[106, 
hammer, 16oz carpenters hammer, 1.0], after=[106, hammer, 18oz carpenter 
hammer, 1.0], op=UPDATE, meta=()}");
             waitUntilSpecificEvent(
                     "DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[107, 
rocks, box of assorted rocks, 5.3], after=[107, rocks, box of assorted rocks, 
5.1], op=UPDATE, meta=()}");
-            waitUntilSpecificEvent(
-                    "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS_1, 
schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS` 
VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}");

Review Comment:
   The original test case looks suspicious. Why `DEBEZIUM.CUSTOMERS`'s primary 
key `ID INT NOT NULL` maps to a BIGINT and its value has changed?



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java:
##########
@@ -503,8 +508,11 @@ private void testRemoveAndAddCollectionsOneByOne(
             // assert fetched changelog data in this round
             MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size());
 
-            MongoDBAssertUtils.assertEqualsInAnyOrder(
-                    fetchedDataList, 
TestValuesTableFactory.getRawResultsAsStrings("sink"));
+            assertEqualsInAnyOrderWithAllowedDuplicateUpdatePair(
+                    fetchedDataList,
+                    TestValuesTableFactory.getRawResultsAsStrings("sink"),
+                    collection0UpdateBefore,
+                    collection0UpdateAfter);

Review Comment:
   This assertion is really cryptic. IIUC it is basically asserting this:
   
   ```java
   assertThat(TestValuesTableFactory.getRawResultsAsStrings("sink"))
           .satisfiesAnyOf(
                   actual -> assertThat(actual)
                           .containsExactlyInAnyOrderElementsOf(expected),
                   actual -> assertThat(actual)
                           
.containsExactlyInAnyOrderElementsOf(expectedWithRetryDuplicate));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to