Copilot commented on code in PR #4293:
URL: https://github.com/apache/flink-cdc/pull/4293#discussion_r2875896249


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java:
##########
@@ -646,85 +646,100 @@ public void testDatabaseNameWithHyphenEndToEnd() throws 
Exception {
         FlinkSourceProvider sourceProvider =
                 (FlinkSourceProvider) dataSource.getEventSourceProvider();
 
-        CloseableIterator<Event> events =
+        DataStreamSource<Event> source =
                 testEnv.fromSource(
-                                sourceProvider.getSource(),
-                                WatermarkStrategy.noWatermarks(),
-                                PostgresDataSourceFactory.IDENTIFIER,
-                                new EventTypeInfo())
-                        .executeAndCollect();
+                        sourceProvider.getSource(),
+                        WatermarkStrategy.noWatermarks(),
+                        PostgresDataSourceFactory.IDENTIFIER,
+                        new EventTypeInfo());
+
+        TypeSerializer<Event> serializer =
+                
source.getTransformation().getOutputType().createSerializer(testEnv.getConfig());
+        CheckpointedCollectResultBuffer<Event> resultBuffer =
+                new CheckpointedCollectResultBuffer<>(serializer);
+        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+        CollectResultIterator<Event> iterator =
+                addCollector(testEnv, source, resultBuffer, serializer, 
accumulatorName);
 
-        // Collect events and verify data
-        List<Event> collectedEvents = new ArrayList<>();
-        int expectedDataCount = 3; // We inserted 3 rows
-        int dataCount = 0;
-        int maxEvents = 10; // Safety limit
-
-        while (events.hasNext() && collectedEvents.size() < maxEvents) {
-            Event event = events.next();
-            collectedEvents.add(event);
-            if (event instanceof DataChangeEvent) {
-                dataCount++;
-                if (dataCount >= expectedDataCount) {
-                    break;
+        JobClient jobClient = 
testEnv.executeAsync("testDatabaseNameWithHyphen");
+        iterator.setJobClient(jobClient);
+
+        try {
+            // Collect events and verify data
+            List<Event> collectedEvents = new ArrayList<>();
+            int expectedDataCount = 3; // We inserted 3 rows
+            int dataCount = 0;
+            int maxEvents = 10; // Safety limit
+
+            while (iterator.hasNext() && collectedEvents.size() < maxEvents) {
+                Event event = iterator.next();
+                collectedEvents.add(event);
+                if (event instanceof DataChangeEvent) {
+                    dataCount++;
+                    if (dataCount >= expectedDataCount) {
+                        break;
+                    }
                 }
             }
-        }
-        events.close();
-
-        // Verify we received CreateTableEvent and DataChangeEvents
-        assertThat(collectedEvents).isNotEmpty();
 
-        // Check for CreateTableEvent
-        long createTableEventCount =
-                collectedEvents.stream().filter(e -> e instanceof 
CreateTableEvent).count();
-        assertThat(createTableEventCount).isGreaterThanOrEqualTo(1);
+            // Verify we received CreateTableEvent and DataChangeEvents
+            assertThat(collectedEvents).isNotEmpty();
 
-        // Check for DataChangeEvents (INSERT events from snapshot)
-        List<DataChangeEvent> dataChangeEvents =
-                collectedEvents.stream()
-                        .filter(e -> e instanceof DataChangeEvent)
-                        .map(e -> (DataChangeEvent) e)
-                        .collect(Collectors.toList());
+            // Check for CreateTableEvent
+            long createTableEventCount =
+                    collectedEvents.stream().filter(e -> e instanceof 
CreateTableEvent).count();
+            assertThat(createTableEventCount).isGreaterThanOrEqualTo(1);
 
-        assertThat(dataChangeEvents).hasSize(expectedDataCount);
+            // Check for DataChangeEvents (INSERT events from snapshot)
+            List<DataChangeEvent> dataChangeEvents =
+                    collectedEvents.stream()
+                            .filter(e -> e instanceof DataChangeEvent)
+                            .map(e -> (DataChangeEvent) e)
+                            .collect(Collectors.toList());
 
-        // Verify the table ID in events
-        for (DataChangeEvent dce : dataChangeEvents) {
-            assertThat(dce.tableId().getSchemaName()).isEqualTo("public");
-            assertThat(dce.tableId().getTableName()).isEqualTo("test_table");
-        }
+            assertThat(dataChangeEvents).hasSize(expectedDataCount);
 
-        // Verify the data content - we should have 3 INSERT events with ids 
1, 2, 3
-        List<Integer> actualIds =
-                dataChangeEvents.stream()
-                        .map(
-                                dce -> {
-                                    RecordData after = dce.after();
-                                    return after.getInt(0); // id column
-                                })
-                        .sorted()
-                        .collect(Collectors.toList());
-        assertThat(actualIds).containsExactly(1, 2, 3);
+            // Verify the table ID in events
+            for (DataChangeEvent dce : dataChangeEvents) {
+                assertThat(dce.tableId().getSchemaName()).isEqualTo("public");
+                
assertThat(dce.tableId().getTableName()).isEqualTo("test_table");
+            }
 
-        // Cleanup - first drop replication slot, then terminate connections 
and drop database
-        try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
-                Statement statement = connection.createStatement()) {
-            // Drop replication slot first (it was created during CDC 
connection)
-            try {
-                statement.execute(String.format("SELECT 
pg_drop_replication_slot('%s')", slotName));
-            } catch (SQLException e) {
-                // Ignore if slot doesn't exist
-                LOG.warn("Failed to drop replication slot: {}", 
e.getMessage());
+            // Verify the data content - we should have 3 INSERT events with 
ids 1, 2, 3
+            List<Integer> actualIds =
+                    dataChangeEvents.stream()
+                            .map(
+                                    dce -> {
+                                        RecordData after = dce.after();
+                                        return after.getInt(0); // id column
+                                    })
+                            .sorted()
+                            .collect(Collectors.toList());
+            assertThat(actualIds).containsExactly(1, 2, 3);
+        } finally {
+            // Cancel the job first to release the replication slot
+            iterator.close();
+            jobClient.cancel().get();
+
+            // Wait for the job to fully stop and release the replication slot
+            Thread.sleep(3000);
+
+            // Cleanup - drop replication slot, terminate connections and drop 
database
+            try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
+                    Statement statement = connection.createStatement()) {
+                try {
+                    statement.execute(
+                            String.format("SELECT 
pg_drop_replication_slot('%s')", slotName));
+                } catch (SQLException e) {
+                    LOG.warn("Failed to drop replication slot: {}", 
e.getMessage());
+                }

Review Comment:
   Dropping the replication slot is attempted only once and failures are 
swallowed. Since this test is specifically dealing with slot-release timing, a 
single attempt can still be flaky if the slot remains active briefly after 
cancellation. Consider adding a small retry loop (with a deadline) that retries 
`pg_drop_replication_slot` on the "slot is active" error before giving up.
   ```suggestion
                   long dropSlotDeadlineMillis = System.currentTimeMillis() + 
10_000L;
                   boolean slotDropped = false;
                   while (!slotDropped && System.currentTimeMillis() < 
dropSlotDeadlineMillis) {
                       try {
                           statement.execute(
                                   String.format(
                                           "SELECT 
pg_drop_replication_slot('%s')", slotName));
                           slotDropped = true;
                       } catch (SQLException e) {
                           String sqlState = e.getSQLState();
                           String message = e.getMessage();
                           boolean slotActive =
                                   "55006".equals(sqlState)
                                           || (message != null
                                                   && 
message.toLowerCase().contains("is active"));
                           if (!slotActive) {
                               LOG.warn(
                                       "Failed to drop replication slot '{}': 
{}",
                                       slotName,
                                       e.getMessage());
                               break;
                           }
                           // Slot is still active; wait briefly and retry 
until the deadline.
                           Thread.sleep(200);
                       }
                   }
                   if (!slotDropped) {
                       LOG.warn(
                               "Failed to drop replication slot '{}' within 
retry deadline",
                               slotName);
                   }
   ```



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java:
##########
@@ -646,85 +646,100 @@ public void testDatabaseNameWithHyphenEndToEnd() throws 
Exception {
         FlinkSourceProvider sourceProvider =
                 (FlinkSourceProvider) dataSource.getEventSourceProvider();
 
-        CloseableIterator<Event> events =
+        DataStreamSource<Event> source =
                 testEnv.fromSource(
-                                sourceProvider.getSource(),
-                                WatermarkStrategy.noWatermarks(),
-                                PostgresDataSourceFactory.IDENTIFIER,
-                                new EventTypeInfo())
-                        .executeAndCollect();
+                        sourceProvider.getSource(),
+                        WatermarkStrategy.noWatermarks(),
+                        PostgresDataSourceFactory.IDENTIFIER,
+                        new EventTypeInfo());
+
+        TypeSerializer<Event> serializer =
+                
source.getTransformation().getOutputType().createSerializer(testEnv.getConfig());
+        CheckpointedCollectResultBuffer<Event> resultBuffer =
+                new CheckpointedCollectResultBuffer<>(serializer);
+        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+        CollectResultIterator<Event> iterator =
+                addCollector(testEnv, source, resultBuffer, serializer, 
accumulatorName);
 
-        // Collect events and verify data
-        List<Event> collectedEvents = new ArrayList<>();
-        int expectedDataCount = 3; // We inserted 3 rows
-        int dataCount = 0;
-        int maxEvents = 10; // Safety limit
-
-        while (events.hasNext() && collectedEvents.size() < maxEvents) {
-            Event event = events.next();
-            collectedEvents.add(event);
-            if (event instanceof DataChangeEvent) {
-                dataCount++;
-                if (dataCount >= expectedDataCount) {
-                    break;
+        JobClient jobClient = 
testEnv.executeAsync("testDatabaseNameWithHyphen");
+        iterator.setJobClient(jobClient);
+
+        try {
+            // Collect events and verify data
+            List<Event> collectedEvents = new ArrayList<>();
+            int expectedDataCount = 3; // We inserted 3 rows
+            int dataCount = 0;
+            int maxEvents = 10; // Safety limit
+
+            while (iterator.hasNext() && collectedEvents.size() < maxEvents) {
+                Event event = iterator.next();
+                collectedEvents.add(event);
+                if (event instanceof DataChangeEvent) {
+                    dataCount++;
+                    if (dataCount >= expectedDataCount) {
+                        break;
+                    }
                 }
             }
-        }
-        events.close();
-
-        // Verify we received CreateTableEvent and DataChangeEvents
-        assertThat(collectedEvents).isNotEmpty();
 
-        // Check for CreateTableEvent
-        long createTableEventCount =
-                collectedEvents.stream().filter(e -> e instanceof 
CreateTableEvent).count();
-        assertThat(createTableEventCount).isGreaterThanOrEqualTo(1);
+            // Verify we received CreateTableEvent and DataChangeEvents
+            assertThat(collectedEvents).isNotEmpty();
 
-        // Check for DataChangeEvents (INSERT events from snapshot)
-        List<DataChangeEvent> dataChangeEvents =
-                collectedEvents.stream()
-                        .filter(e -> e instanceof DataChangeEvent)
-                        .map(e -> (DataChangeEvent) e)
-                        .collect(Collectors.toList());
+            // Check for CreateTableEvent
+            long createTableEventCount =
+                    collectedEvents.stream().filter(e -> e instanceof 
CreateTableEvent).count();
+            assertThat(createTableEventCount).isGreaterThanOrEqualTo(1);
 
-        assertThat(dataChangeEvents).hasSize(expectedDataCount);
+            // Check for DataChangeEvents (INSERT events from snapshot)
+            List<DataChangeEvent> dataChangeEvents =
+                    collectedEvents.stream()
+                            .filter(e -> e instanceof DataChangeEvent)
+                            .map(e -> (DataChangeEvent) e)
+                            .collect(Collectors.toList());
 
-        // Verify the table ID in events
-        for (DataChangeEvent dce : dataChangeEvents) {
-            assertThat(dce.tableId().getSchemaName()).isEqualTo("public");
-            assertThat(dce.tableId().getTableName()).isEqualTo("test_table");
-        }
+            assertThat(dataChangeEvents).hasSize(expectedDataCount);
 
-        // Verify the data content - we should have 3 INSERT events with ids 
1, 2, 3
-        List<Integer> actualIds =
-                dataChangeEvents.stream()
-                        .map(
-                                dce -> {
-                                    RecordData after = dce.after();
-                                    return after.getInt(0); // id column
-                                })
-                        .sorted()
-                        .collect(Collectors.toList());
-        assertThat(actualIds).containsExactly(1, 2, 3);
+            // Verify the table ID in events
+            for (DataChangeEvent dce : dataChangeEvents) {
+                assertThat(dce.tableId().getSchemaName()).isEqualTo("public");
+                
assertThat(dce.tableId().getTableName()).isEqualTo("test_table");
+            }
 
-        // Cleanup - first drop replication slot, then terminate connections 
and drop database
-        try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
-                Statement statement = connection.createStatement()) {
-            // Drop replication slot first (it was created during CDC 
connection)
-            try {
-                statement.execute(String.format("SELECT 
pg_drop_replication_slot('%s')", slotName));
-            } catch (SQLException e) {
-                // Ignore if slot doesn't exist
-                LOG.warn("Failed to drop replication slot: {}", 
e.getMessage());
+            // Verify the data content - we should have 3 INSERT events with 
ids 1, 2, 3
+            List<Integer> actualIds =
+                    dataChangeEvents.stream()
+                            .map(
+                                    dce -> {
+                                        RecordData after = dce.after();
+                                        return after.getInt(0); // id column
+                                    })
+                            .sorted()
+                            .collect(Collectors.toList());
+            assertThat(actualIds).containsExactly(1, 2, 3);
+        } finally {
+            // Cancel the job first to release the replication slot
+            iterator.close();
+            jobClient.cancel().get();
+
+            // Wait for the job to fully stop and release the replication slot
+            Thread.sleep(3000);
+

Review Comment:
   The cleanup uses a fixed `Thread.sleep(3000)` to wait for the job to stop 
and release the replication slot. This is brittle in CI (may be too short on 
slow runners and adds unnecessary latency on fast ones). Prefer polling for the 
job to reach a terminal status (e.g., CANCELED) and/or retrying 
replication-slot drop until it succeeds or a deadline is reached.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java:
##########
@@ -646,85 +646,100 @@ public void testDatabaseNameWithHyphenEndToEnd() throws 
Exception {
         FlinkSourceProvider sourceProvider =
                 (FlinkSourceProvider) dataSource.getEventSourceProvider();
 
-        CloseableIterator<Event> events =
+        DataStreamSource<Event> source =
                 testEnv.fromSource(
-                                sourceProvider.getSource(),
-                                WatermarkStrategy.noWatermarks(),
-                                PostgresDataSourceFactory.IDENTIFIER,
-                                new EventTypeInfo())
-                        .executeAndCollect();
+                        sourceProvider.getSource(),
+                        WatermarkStrategy.noWatermarks(),
+                        PostgresDataSourceFactory.IDENTIFIER,
+                        new EventTypeInfo());
+
+        TypeSerializer<Event> serializer =
+                
source.getTransformation().getOutputType().createSerializer(testEnv.getConfig());
+        CheckpointedCollectResultBuffer<Event> resultBuffer =
+                new CheckpointedCollectResultBuffer<>(serializer);
+        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+        CollectResultIterator<Event> iterator =
+                addCollector(testEnv, source, resultBuffer, serializer, 
accumulatorName);
 
-        // Collect events and verify data
-        List<Event> collectedEvents = new ArrayList<>();
-        int expectedDataCount = 3; // We inserted 3 rows
-        int dataCount = 0;
-        int maxEvents = 10; // Safety limit
-
-        while (events.hasNext() && collectedEvents.size() < maxEvents) {
-            Event event = events.next();
-            collectedEvents.add(event);
-            if (event instanceof DataChangeEvent) {
-                dataCount++;
-                if (dataCount >= expectedDataCount) {
-                    break;
+        JobClient jobClient = 
testEnv.executeAsync("testDatabaseNameWithHyphen");
+        iterator.setJobClient(jobClient);
+
+        try {
+            // Collect events and verify data
+            List<Event> collectedEvents = new ArrayList<>();
+            int expectedDataCount = 3; // We inserted 3 rows
+            int dataCount = 0;
+            int maxEvents = 10; // Safety limit
+
+            while (iterator.hasNext() && collectedEvents.size() < maxEvents) {
+                Event event = iterator.next();
+                collectedEvents.add(event);
+                if (event instanceof DataChangeEvent) {
+                    dataCount++;
+                    if (dataCount >= expectedDataCount) {
+                        break;
+                    }
                 }
             }
-        }
-        events.close();
-
-        // Verify we received CreateTableEvent and DataChangeEvents
-        assertThat(collectedEvents).isNotEmpty();
 
-        // Check for CreateTableEvent
-        long createTableEventCount =
-                collectedEvents.stream().filter(e -> e instanceof 
CreateTableEvent).count();
-        assertThat(createTableEventCount).isGreaterThanOrEqualTo(1);
+            // Verify we received CreateTableEvent and DataChangeEvents
+            assertThat(collectedEvents).isNotEmpty();
 
-        // Check for DataChangeEvents (INSERT events from snapshot)
-        List<DataChangeEvent> dataChangeEvents =
-                collectedEvents.stream()
-                        .filter(e -> e instanceof DataChangeEvent)
-                        .map(e -> (DataChangeEvent) e)
-                        .collect(Collectors.toList());
+            // Check for CreateTableEvent
+            long createTableEventCount =
+                    collectedEvents.stream().filter(e -> e instanceof 
CreateTableEvent).count();
+            assertThat(createTableEventCount).isGreaterThanOrEqualTo(1);
 
-        assertThat(dataChangeEvents).hasSize(expectedDataCount);
+            // Check for DataChangeEvents (INSERT events from snapshot)
+            List<DataChangeEvent> dataChangeEvents =
+                    collectedEvents.stream()
+                            .filter(e -> e instanceof DataChangeEvent)
+                            .map(e -> (DataChangeEvent) e)
+                            .collect(Collectors.toList());
 
-        // Verify the table ID in events
-        for (DataChangeEvent dce : dataChangeEvents) {
-            assertThat(dce.tableId().getSchemaName()).isEqualTo("public");
-            assertThat(dce.tableId().getTableName()).isEqualTo("test_table");
-        }
+            assertThat(dataChangeEvents).hasSize(expectedDataCount);
 
-        // Verify the data content - we should have 3 INSERT events with ids 
1, 2, 3
-        List<Integer> actualIds =
-                dataChangeEvents.stream()
-                        .map(
-                                dce -> {
-                                    RecordData after = dce.after();
-                                    return after.getInt(0); // id column
-                                })
-                        .sorted()
-                        .collect(Collectors.toList());
-        assertThat(actualIds).containsExactly(1, 2, 3);
+            // Verify the table ID in events
+            for (DataChangeEvent dce : dataChangeEvents) {
+                assertThat(dce.tableId().getSchemaName()).isEqualTo("public");
+                
assertThat(dce.tableId().getTableName()).isEqualTo("test_table");
+            }
 
-        // Cleanup - first drop replication slot, then terminate connections 
and drop database
-        try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
-                Statement statement = connection.createStatement()) {
-            // Drop replication slot first (it was created during CDC 
connection)
-            try {
-                statement.execute(String.format("SELECT 
pg_drop_replication_slot('%s')", slotName));
-            } catch (SQLException e) {
-                // Ignore if slot doesn't exist
-                LOG.warn("Failed to drop replication slot: {}", 
e.getMessage());
+            // Verify the data content - we should have 3 INSERT events with 
ids 1, 2, 3
+            List<Integer> actualIds =
+                    dataChangeEvents.stream()
+                            .map(
+                                    dce -> {
+                                        RecordData after = dce.after();
+                                        return after.getInt(0); // id column
+                                    })
+                            .sorted()
+                            .collect(Collectors.toList());
+            assertThat(actualIds).containsExactly(1, 2, 3);
+        } finally {
+            // Cancel the job first to release the replication slot
+            iterator.close();
+            jobClient.cancel().get();
+

Review Comment:
   If `jobClient.cancel().get()` throws (e.g., job already failed/terminated) 
or blocks for a long time, the subsequent DB/slot cleanup won't run, 
potentially leaking resources and causing later tests to flake. Consider 
wrapping the cancel/await in its own try/catch (and ideally using a bounded 
wait) so the JDBC cleanup always executes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to