This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 31d68ac28 [chore][test] Fix flaky postgres pipeline test case (#4293)
31d68ac28 is described below
commit 31d68ac28156b89878c9ac12e2f90c59d98e7626
Author: Jia Fan <[email protected]>
AuthorDate: Wed Mar 4 09:51:42 2026 +0800
[chore][test] Fix flaky postgres pipeline test case (#4293)
---
.../source/PostgresPipelineITCaseTest.java | 153 ++++++++++++---------
1 file changed, 86 insertions(+), 67 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
index 40b87f5d4..acecc2bac 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
@@ -646,85 +646,104 @@ public class PostgresPipelineITCaseTest extends
PostgresTestBase {
FlinkSourceProvider sourceProvider =
(FlinkSourceProvider) dataSource.getEventSourceProvider();
- CloseableIterator<Event> events =
+ DataStreamSource<Event> source =
testEnv.fromSource(
- sourceProvider.getSource(),
- WatermarkStrategy.noWatermarks(),
- PostgresDataSourceFactory.IDENTIFIER,
- new EventTypeInfo())
- .executeAndCollect();
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ PostgresDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo());
- // Collect events and verify data
- List<Event> collectedEvents = new ArrayList<>();
- int expectedDataCount = 3; // We inserted 3 rows
- int dataCount = 0;
- int maxEvents = 10; // Safety limit
-
- while (events.hasNext() && collectedEvents.size() < maxEvents) {
- Event event = events.next();
- collectedEvents.add(event);
- if (event instanceof DataChangeEvent) {
- dataCount++;
- if (dataCount >= expectedDataCount) {
- break;
+ TypeSerializer<Event> serializer =
+
source.getTransformation().getOutputType().createSerializer(testEnv.getConfig());
+ CheckpointedCollectResultBuffer<Event> resultBuffer =
+ new CheckpointedCollectResultBuffer<>(serializer);
+ String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+ CollectResultIterator<Event> iterator =
+ addCollector(testEnv, source, resultBuffer, serializer,
accumulatorName);
+
+ JobClient jobClient =
testEnv.executeAsync("testDatabaseNameWithHyphen");
+ iterator.setJobClient(jobClient);
+
+ try {
+ // Collect events and verify data
+ List<Event> collectedEvents = new ArrayList<>();
+ int expectedDataCount = 3; // We inserted 3 rows
+ int dataCount = 0;
+ int maxEvents = 10; // Safety limit
+
+ while (iterator.hasNext() && collectedEvents.size() < maxEvents) {
+ Event event = iterator.next();
+ collectedEvents.add(event);
+ if (event instanceof DataChangeEvent) {
+ dataCount++;
+ if (dataCount >= expectedDataCount) {
+ break;
+ }
}
}
- }
- events.close();
- // Verify we received CreateTableEvent and DataChangeEvents
- assertThat(collectedEvents).isNotEmpty();
+ // Verify we received CreateTableEvent and DataChangeEvents
+ assertThat(collectedEvents).isNotEmpty();
- // Check for CreateTableEvent
- long createTableEventCount =
- collectedEvents.stream().filter(e -> e instanceof
CreateTableEvent).count();
- assertThat(createTableEventCount).isGreaterThanOrEqualTo(1);
+ // Check for CreateTableEvent
+ long createTableEventCount =
+ collectedEvents.stream().filter(e -> e instanceof
CreateTableEvent).count();
+ assertThat(createTableEventCount).isGreaterThanOrEqualTo(1);
- // Check for DataChangeEvents (INSERT events from snapshot)
- List<DataChangeEvent> dataChangeEvents =
- collectedEvents.stream()
- .filter(e -> e instanceof DataChangeEvent)
- .map(e -> (DataChangeEvent) e)
- .collect(Collectors.toList());
+ // Check for DataChangeEvents (INSERT events from snapshot)
+ List<DataChangeEvent> dataChangeEvents =
+ collectedEvents.stream()
+ .filter(e -> e instanceof DataChangeEvent)
+ .map(e -> (DataChangeEvent) e)
+ .collect(Collectors.toList());
- assertThat(dataChangeEvents).hasSize(expectedDataCount);
-
- // Verify the table ID in events
- for (DataChangeEvent dce : dataChangeEvents) {
- assertThat(dce.tableId().getSchemaName()).isEqualTo("public");
- assertThat(dce.tableId().getTableName()).isEqualTo("test_table");
- }
+ assertThat(dataChangeEvents).hasSize(expectedDataCount);
- // Verify the data content - we should have 3 INSERT events with ids
1, 2, 3
- List<Integer> actualIds =
- dataChangeEvents.stream()
- .map(
- dce -> {
- RecordData after = dce.after();
- return after.getInt(0); // id column
- })
- .sorted()
- .collect(Collectors.toList());
- assertThat(actualIds).containsExactly(1, 2, 3);
+ // Verify the table ID in events
+ for (DataChangeEvent dce : dataChangeEvents) {
+ assertThat(dce.tableId().getSchemaName()).isEqualTo("public");
+
assertThat(dce.tableId().getTableName()).isEqualTo("test_table");
+ }
- // Cleanup - first drop replication slot, then terminate connections
and drop database
- try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
- Statement statement = connection.createStatement()) {
- // Drop replication slot first (it was created during CDC
connection)
+ // Verify the data content - we should have 3 INSERT events with
ids 1, 2, 3
+ List<Integer> actualIds =
+ dataChangeEvents.stream()
+ .map(
+ dce -> {
+ RecordData after = dce.after();
+ return after.getInt(0); // id column
+ })
+ .sorted()
+ .collect(Collectors.toList());
+ assertThat(actualIds).containsExactly(1, 2, 3);
+ } finally {
+ // Cancel the job with a bounded wait so cleanup always runs
try {
- statement.execute(String.format("SELECT
pg_drop_replication_slot('%s')", slotName));
- } catch (SQLException e) {
- // Ignore if slot doesn't exist
- LOG.warn("Failed to drop replication slot: {}",
e.getMessage());
+ iterator.close();
+ jobClient.cancel().get();
+ } catch (Exception e) {
+ LOG.warn("Failed to cancel job: {}", e.getMessage());
+ }
+
+ // Wait for the job to fully stop and release the replication slot
+ Thread.sleep(3000);
+
+ // Cleanup - drop replication slot, terminate connections and drop
database
+ try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
+ Statement statement = connection.createStatement()) {
+ try {
+ statement.execute(
+ String.format("SELECT
pg_drop_replication_slot('%s')", slotName));
+ } catch (SQLException e) {
+ LOG.warn("Failed to drop replication slot: {}",
e.getMessage());
+ }
+ statement.execute(
+ "SELECT pg_terminate_backend(pid) FROM
pg_stat_activity WHERE datname = '"
+ + hyphenDbName
+ + "'");
+ Thread.sleep(500);
+ statement.execute("DROP DATABASE IF EXISTS \"" + hyphenDbName
+ "\"");
}
- // Terminate all connections to the database
- statement.execute(
- "SELECT pg_terminate_backend(pid) FROM pg_stat_activity
WHERE datname = '"
- + hyphenDbName
- + "'");
- // Small delay to ensure connections are terminated
- Thread.sleep(500);
- statement.execute("DROP DATABASE IF EXISTS \"" + hyphenDbName +
"\"");
}
}