This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 31d68ac28 [chore][test] Fix flaky postgres pipeline test case (#4293)
31d68ac28 is described below

commit 31d68ac28156b89878c9ac12e2f90c59d98e7626
Author: Jia Fan <[email protected]>
AuthorDate: Wed Mar 4 09:51:42 2026 +0800

    [chore][test] Fix flaky postgres pipeline test case (#4293)
---
 .../source/PostgresPipelineITCaseTest.java         | 153 ++++++++++++---------
 1 file changed, 86 insertions(+), 67 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
index 40b87f5d4..acecc2bac 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
@@ -646,85 +646,104 @@ public class PostgresPipelineITCaseTest extends 
PostgresTestBase {
         FlinkSourceProvider sourceProvider =
                 (FlinkSourceProvider) dataSource.getEventSourceProvider();
 
-        CloseableIterator<Event> events =
+        DataStreamSource<Event> source =
                 testEnv.fromSource(
-                                sourceProvider.getSource(),
-                                WatermarkStrategy.noWatermarks(),
-                                PostgresDataSourceFactory.IDENTIFIER,
-                                new EventTypeInfo())
-                        .executeAndCollect();
+                        sourceProvider.getSource(),
+                        WatermarkStrategy.noWatermarks(),
+                        PostgresDataSourceFactory.IDENTIFIER,
+                        new EventTypeInfo());
 
-        // Collect events and verify data
-        List<Event> collectedEvents = new ArrayList<>();
-        int expectedDataCount = 3; // We inserted 3 rows
-        int dataCount = 0;
-        int maxEvents = 10; // Safety limit
-
-        while (events.hasNext() && collectedEvents.size() < maxEvents) {
-            Event event = events.next();
-            collectedEvents.add(event);
-            if (event instanceof DataChangeEvent) {
-                dataCount++;
-                if (dataCount >= expectedDataCount) {
-                    break;
+        TypeSerializer<Event> serializer =
+                
source.getTransformation().getOutputType().createSerializer(testEnv.getConfig());
+        CheckpointedCollectResultBuffer<Event> resultBuffer =
+                new CheckpointedCollectResultBuffer<>(serializer);
+        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+        CollectResultIterator<Event> iterator =
+                addCollector(testEnv, source, resultBuffer, serializer, 
accumulatorName);
+
+        JobClient jobClient = 
testEnv.executeAsync("testDatabaseNameWithHyphen");
+        iterator.setJobClient(jobClient);
+
+        try {
+            // Collect events and verify data
+            List<Event> collectedEvents = new ArrayList<>();
+            int expectedDataCount = 3; // We inserted 3 rows
+            int dataCount = 0;
+            int maxEvents = 10; // Safety limit
+
+            while (iterator.hasNext() && collectedEvents.size() < maxEvents) {
+                Event event = iterator.next();
+                collectedEvents.add(event);
+                if (event instanceof DataChangeEvent) {
+                    dataCount++;
+                    if (dataCount >= expectedDataCount) {
+                        break;
+                    }
                 }
             }
-        }
-        events.close();
 
-        // Verify we received CreateTableEvent and DataChangeEvents
-        assertThat(collectedEvents).isNotEmpty();
+            // Verify we received CreateTableEvent and DataChangeEvents
+            assertThat(collectedEvents).isNotEmpty();
 
-        // Check for CreateTableEvent
-        long createTableEventCount =
-                collectedEvents.stream().filter(e -> e instanceof 
CreateTableEvent).count();
-        assertThat(createTableEventCount).isGreaterThanOrEqualTo(1);
+            // Check for CreateTableEvent
+            long createTableEventCount =
+                    collectedEvents.stream().filter(e -> e instanceof 
CreateTableEvent).count();
+            assertThat(createTableEventCount).isGreaterThanOrEqualTo(1);
 
-        // Check for DataChangeEvents (INSERT events from snapshot)
-        List<DataChangeEvent> dataChangeEvents =
-                collectedEvents.stream()
-                        .filter(e -> e instanceof DataChangeEvent)
-                        .map(e -> (DataChangeEvent) e)
-                        .collect(Collectors.toList());
+            // Check for DataChangeEvents (INSERT events from snapshot)
+            List<DataChangeEvent> dataChangeEvents =
+                    collectedEvents.stream()
+                            .filter(e -> e instanceof DataChangeEvent)
+                            .map(e -> (DataChangeEvent) e)
+                            .collect(Collectors.toList());
 
-        assertThat(dataChangeEvents).hasSize(expectedDataCount);
-
-        // Verify the table ID in events
-        for (DataChangeEvent dce : dataChangeEvents) {
-            assertThat(dce.tableId().getSchemaName()).isEqualTo("public");
-            assertThat(dce.tableId().getTableName()).isEqualTo("test_table");
-        }
+            assertThat(dataChangeEvents).hasSize(expectedDataCount);
 
-        // Verify the data content - we should have 3 INSERT events with ids 
1, 2, 3
-        List<Integer> actualIds =
-                dataChangeEvents.stream()
-                        .map(
-                                dce -> {
-                                    RecordData after = dce.after();
-                                    return after.getInt(0); // id column
-                                })
-                        .sorted()
-                        .collect(Collectors.toList());
-        assertThat(actualIds).containsExactly(1, 2, 3);
+            // Verify the table ID in events
+            for (DataChangeEvent dce : dataChangeEvents) {
+                assertThat(dce.tableId().getSchemaName()).isEqualTo("public");
+                
assertThat(dce.tableId().getTableName()).isEqualTo("test_table");
+            }
 
-        // Cleanup - first drop replication slot, then terminate connections 
and drop database
-        try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
-                Statement statement = connection.createStatement()) {
-            // Drop replication slot first (it was created during CDC 
connection)
+            // Verify the data content - we should have 3 INSERT events with 
ids 1, 2, 3
+            List<Integer> actualIds =
+                    dataChangeEvents.stream()
+                            .map(
+                                    dce -> {
+                                        RecordData after = dce.after();
+                                        return after.getInt(0); // id column
+                                    })
+                            .sorted()
+                            .collect(Collectors.toList());
+            assertThat(actualIds).containsExactly(1, 2, 3);
+        } finally {
+            // Cancel the job with a bounded wait so cleanup always runs
             try {
-                statement.execute(String.format("SELECT 
pg_drop_replication_slot('%s')", slotName));
-            } catch (SQLException e) {
-                // Ignore if slot doesn't exist
-                LOG.warn("Failed to drop replication slot: {}", 
e.getMessage());
+                iterator.close();
+                jobClient.cancel().get();
+            } catch (Exception e) {
+                LOG.warn("Failed to cancel job: {}", e.getMessage());
+            }
+
+            // Wait for the job to fully stop and release the replication slot
+            Thread.sleep(3000);
+
+            // Cleanup - drop replication slot, terminate connections and drop 
database
+            try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
+                    Statement statement = connection.createStatement()) {
+                try {
+                    statement.execute(
+                            String.format("SELECT 
pg_drop_replication_slot('%s')", slotName));
+                } catch (SQLException e) {
+                    LOG.warn("Failed to drop replication slot: {}", 
e.getMessage());
+                }
+                statement.execute(
+                        "SELECT pg_terminate_backend(pid) FROM 
pg_stat_activity WHERE datname = '"
+                                + hyphenDbName
+                                + "'");
+                Thread.sleep(500);
+                statement.execute("DROP DATABASE IF EXISTS \"" + hyphenDbName 
+ "\"");
             }
-            // Terminate all connections to the database
-            statement.execute(
-                    "SELECT pg_terminate_backend(pid) FROM pg_stat_activity 
WHERE datname = '"
-                            + hyphenDbName
-                            + "'");
-            // Small delay to ensure connections are terminated
-            Thread.sleep(500);
-            statement.execute("DROP DATABASE IF EXISTS \"" + hyphenDbName + 
"\"");
         }
     }
 

Reply via email to