yuxiqian commented on code in PR #4444:
URL: https://github.com/apache/flink-cdc/pull/4444#discussion_r3475539283
##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java:
##########
@@ -170,50 +171,30 @@ void testSyncWholeDatabase() throws Exception {
waitUntilSpecificEvent(
"DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[],
after=[105, hammer, 14oz carpenters hammer, 0.875], op=INSERT, meta=()}");
- waitUntilSpecificEvent(
- "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS,
schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS`
VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}");
- waitUntilSpecificEvent(
- "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[],
after=[171798691841, user_1, Shanghai, 123567891234], op=INSERT, meta=()}");
- waitUntilSpecificEvent(
- "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[],
after=[171798691841, user_4, Shanghai, 123567891234], op=INSERT, meta=()}");
- waitUntilSpecificEvent(
- "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[],
after=[171798691841, user_3, Shanghai, 123567891234], op=INSERT, meta=()}");
- waitUntilSpecificEvent(
- "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[],
after=[171798691841, user_2, Shanghai, 123567891234], op=INSERT, meta=()}");
- waitUntilSpecificEvent(
- "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[],
after=[171798691842, user_13, Shanghai, 123567891234], op=INSERT, meta=()}");
- waitUntilSpecificEvent(
- "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[],
after=[171798691842, user_14, Shanghai, 123567891234], op=INSERT, meta=()}");
- waitUntilSpecificEvent(
- "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[],
after=[171798691842, user_11, Shanghai, 123567891234], op=INSERT, meta=()}");
- waitUntilSpecificEvent(
- "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[],
after=[171798691842, user_12, Shanghai, 123567891234], op=INSERT, meta=()}");
- waitUntilSpecificEvent(
- "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[],
after=[171798691842, user_21, Shanghai, 123567891234], op=INSERT, meta=()}");
- waitUntilSpecificEvent(
- "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[],
after=[171798691842, user_10, Shanghai, 123567891234], op=INSERT, meta=()}");
- waitUntilSpecificEvent(
- "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[],
after=[171798691841, user_6, Shanghai, 123567891234], op=INSERT, meta=()}");
- waitUntilSpecificEvent(
- "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[],
after=[171798691841, user_5, Shanghai, 123567891234], op=INSERT, meta=()}");
- waitUntilSpecificEvent(
- "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[],
after=[171798691841, user_9, Shanghai, 123567891234], op=INSERT, meta=()}");
- waitUntilSpecificEvent(
- "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[],
after=[171798691842, user_19, Shanghai, 123567891234], op=INSERT, meta=()}");
- waitUntilSpecificEvent(
- "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[],
after=[171798691842, user_20, Shanghai, 123567891234], op=INSERT, meta=()}");
- waitUntilSpecificEvent(
- "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[],
after=[171798691841, user_8, Shanghai, 123567891234], op=INSERT, meta=()}");
- waitUntilSpecificEvent(
- "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[],
after=[171798691842, user_17, Shanghai, 123567891234], op=INSERT, meta=()}");
- waitUntilSpecificEvent(
- "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[],
after=[171798691842, user_18, Shanghai, 123567891234], op=INSERT, meta=()}");
- waitUntilSpecificEvent(
- "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[],
after=[171798691842, user_15, Shanghai, 123567891234], op=INSERT, meta=()}");
- waitUntilSpecificEvent(
- "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[],
after=[171798691841, user_7, Shanghai, 123567891234], op=INSERT, meta=()}");
- waitUntilSpecificEvent(
- "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[],
after=[171798691842, user_16, Shanghai, 123567891234], op=INSERT, meta=()}");
+ waitUntilAnySpecificEvent(
+ "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS,
schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS`
VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}",
+ "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS,
schema=columns={`ID` DECIMAL(38, 0) NOT NULL,`NAME` VARCHAR(255) NOT
NULL,`ADDRESS` VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID,
options=()}");
+ waitUntilCustomerInsert("DEBEZIUM.CUSTOMERS", 101, "user_1");
Review Comment:
Write these assertions in order?
##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java:
##########
@@ -339,6 +344,7 @@ void testAssortedSchemaTransform(boolean batchMode) throws
Exception {
void testWildcardSchemaTransform(boolean batchMode) throws Exception {
String startupMode = batchMode ? "snapshot" : "initial";
String runtimeMode = batchMode ? "BATCH" : "STREAMING";
+ int testParallelism = 1;
Review Comment:
Why this case doesn't work in multiple parallelism mode?
##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java:
##########
@@ -224,56 +205,84 @@ void testSyncWholeDatabase() throws Exception {
"DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[106,
hammer, 16oz carpenters hammer, 1.0], after=[106, hammer, 18oz carpenter
hammer, 1.0], op=UPDATE, meta=()}");
waitUntilSpecificEvent(
"DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[107,
rocks, box of assorted rocks, 5.3], after=[107, rocks, box of assorted rocks,
5.1], op=UPDATE, meta=()}");
- waitUntilSpecificEvent(
- "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS_1,
schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS`
VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}");
Review Comment:
The original test case looks suspicious. Why `DEBEZIUM.CUSTOMERS`'s primary
key `ID INT NOT NULL` maps to a BIGINT and its value has changed?
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java:
##########
@@ -503,8 +508,11 @@ private void testRemoveAndAddCollectionsOneByOne(
// assert fetched changelog data in this round
MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size());
- MongoDBAssertUtils.assertEqualsInAnyOrder(
- fetchedDataList,
TestValuesTableFactory.getRawResultsAsStrings("sink"));
+ assertEqualsInAnyOrderWithAllowedDuplicateUpdatePair(
+ fetchedDataList,
+ TestValuesTableFactory.getRawResultsAsStrings("sink"),
+ collection0UpdateBefore,
+ collection0UpdateAfter);
Review Comment:
This assertion is really cryptic. IIUC it is basically asserting this:
```java
assertThat(TestValuesTableFactory.getRawResultsAsStrings("sink"))
.satisfiesAnyOf(
actual -> assertThat(actual)
.containsExactlyInAnyOrderElementsOf(expected),
actual -> assertThat(actual)
.containsExactlyInAnyOrderElementsOf(expectedWithRetryDuplicate));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]