Copilot commented on code in PR #4293:
URL: https://github.com/apache/flink-cdc/pull/4293#discussion_r2875896249
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java:
##########
@@ -646,85 +646,100 @@ public void testDatabaseNameWithHyphenEndToEnd() throws
Exception {
FlinkSourceProvider sourceProvider =
(FlinkSourceProvider) dataSource.getEventSourceProvider();
- CloseableIterator<Event> events =
+ DataStreamSource<Event> source =
testEnv.fromSource(
- sourceProvider.getSource(),
- WatermarkStrategy.noWatermarks(),
- PostgresDataSourceFactory.IDENTIFIER,
- new EventTypeInfo())
- .executeAndCollect();
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ PostgresDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo());
+
+ TypeSerializer<Event> serializer =
+
source.getTransformation().getOutputType().createSerializer(testEnv.getConfig());
+ CheckpointedCollectResultBuffer<Event> resultBuffer =
+ new CheckpointedCollectResultBuffer<>(serializer);
+ String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+ CollectResultIterator<Event> iterator =
+ addCollector(testEnv, source, resultBuffer, serializer,
accumulatorName);
- // Collect events and verify data
- List<Event> collectedEvents = new ArrayList<>();
- int expectedDataCount = 3; // We inserted 3 rows
- int dataCount = 0;
- int maxEvents = 10; // Safety limit
-
- while (events.hasNext() && collectedEvents.size() < maxEvents) {
- Event event = events.next();
- collectedEvents.add(event);
- if (event instanceof DataChangeEvent) {
- dataCount++;
- if (dataCount >= expectedDataCount) {
- break;
+ JobClient jobClient =
testEnv.executeAsync("testDatabaseNameWithHyphen");
+ iterator.setJobClient(jobClient);
+
+ try {
+ // Collect events and verify data
+ List<Event> collectedEvents = new ArrayList<>();
+ int expectedDataCount = 3; // We inserted 3 rows
+ int dataCount = 0;
+ int maxEvents = 10; // Safety limit
+
+ while (iterator.hasNext() && collectedEvents.size() < maxEvents) {
+ Event event = iterator.next();
+ collectedEvents.add(event);
+ if (event instanceof DataChangeEvent) {
+ dataCount++;
+ if (dataCount >= expectedDataCount) {
+ break;
+ }
}
}
- }
- events.close();
-
- // Verify we received CreateTableEvent and DataChangeEvents
- assertThat(collectedEvents).isNotEmpty();
- // Check for CreateTableEvent
- long createTableEventCount =
- collectedEvents.stream().filter(e -> e instanceof
CreateTableEvent).count();
- assertThat(createTableEventCount).isGreaterThanOrEqualTo(1);
+ // Verify we received CreateTableEvent and DataChangeEvents
+ assertThat(collectedEvents).isNotEmpty();
- // Check for DataChangeEvents (INSERT events from snapshot)
- List<DataChangeEvent> dataChangeEvents =
- collectedEvents.stream()
- .filter(e -> e instanceof DataChangeEvent)
- .map(e -> (DataChangeEvent) e)
- .collect(Collectors.toList());
+ // Check for CreateTableEvent
+ long createTableEventCount =
+ collectedEvents.stream().filter(e -> e instanceof
CreateTableEvent).count();
+ assertThat(createTableEventCount).isGreaterThanOrEqualTo(1);
- assertThat(dataChangeEvents).hasSize(expectedDataCount);
+ // Check for DataChangeEvents (INSERT events from snapshot)
+ List<DataChangeEvent> dataChangeEvents =
+ collectedEvents.stream()
+ .filter(e -> e instanceof DataChangeEvent)
+ .map(e -> (DataChangeEvent) e)
+ .collect(Collectors.toList());
- // Verify the table ID in events
- for (DataChangeEvent dce : dataChangeEvents) {
- assertThat(dce.tableId().getSchemaName()).isEqualTo("public");
- assertThat(dce.tableId().getTableName()).isEqualTo("test_table");
- }
+ assertThat(dataChangeEvents).hasSize(expectedDataCount);
- // Verify the data content - we should have 3 INSERT events with ids
1, 2, 3
- List<Integer> actualIds =
- dataChangeEvents.stream()
- .map(
- dce -> {
- RecordData after = dce.after();
- return after.getInt(0); // id column
- })
- .sorted()
- .collect(Collectors.toList());
- assertThat(actualIds).containsExactly(1, 2, 3);
+ // Verify the table ID in events
+ for (DataChangeEvent dce : dataChangeEvents) {
+ assertThat(dce.tableId().getSchemaName()).isEqualTo("public");
+
assertThat(dce.tableId().getTableName()).isEqualTo("test_table");
+ }
- // Cleanup - first drop replication slot, then terminate connections
and drop database
- try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
- Statement statement = connection.createStatement()) {
- // Drop replication slot first (it was created during CDC
connection)
- try {
- statement.execute(String.format("SELECT
pg_drop_replication_slot('%s')", slotName));
- } catch (SQLException e) {
- // Ignore if slot doesn't exist
- LOG.warn("Failed to drop replication slot: {}",
e.getMessage());
+ // Verify the data content - we should have 3 INSERT events with
ids 1, 2, 3
+ List<Integer> actualIds =
+ dataChangeEvents.stream()
+ .map(
+ dce -> {
+ RecordData after = dce.after();
+ return after.getInt(0); // id column
+ })
+ .sorted()
+ .collect(Collectors.toList());
+ assertThat(actualIds).containsExactly(1, 2, 3);
+ } finally {
+ // Cancel the job first to release the replication slot
+ iterator.close();
+ jobClient.cancel().get();
+
+ // Wait for the job to fully stop and release the replication slot
+ Thread.sleep(3000);
+
+ // Cleanup - drop replication slot, terminate connections and drop
database
+ try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
+ Statement statement = connection.createStatement()) {
+ try {
+ statement.execute(
+ String.format("SELECT
pg_drop_replication_slot('%s')", slotName));
+ } catch (SQLException e) {
+ LOG.warn("Failed to drop replication slot: {}",
e.getMessage());
+ }
Review Comment:
Dropping the replication slot is attempted only once and failures are
swallowed. Since this test is specifically dealing with slot-release timing, a
single attempt can still be flaky if the slot remains active briefly after
cancellation. Consider adding a small retry loop (with a deadline) that retries
`pg_drop_replication_slot` on the "slot is active" error before giving up.
```suggestion
long dropSlotDeadlineMillis = System.currentTimeMillis() +
10_000L;
boolean slotDropped = false;
while (!slotDropped && System.currentTimeMillis() <
dropSlotDeadlineMillis) {
try {
statement.execute(
String.format(
"SELECT
pg_drop_replication_slot('%s')", slotName));
slotDropped = true;
} catch (SQLException e) {
String sqlState = e.getSQLState();
String message = e.getMessage();
boolean slotActive =
"55006".equals(sqlState)
|| (message != null
&&
message.toLowerCase().contains("is active"));
if (!slotActive) {
LOG.warn(
"Failed to drop replication slot '{}':
{}",
slotName,
e.getMessage());
break;
}
// Slot is still active; wait briefly and retry
until the deadline.
Thread.sleep(200);
}
}
if (!slotDropped) {
LOG.warn(
"Failed to drop replication slot '{}' within
retry deadline",
slotName);
}
```
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java:
##########
@@ -646,85 +646,100 @@ public void testDatabaseNameWithHyphenEndToEnd() throws
Exception {
FlinkSourceProvider sourceProvider =
(FlinkSourceProvider) dataSource.getEventSourceProvider();
- CloseableIterator<Event> events =
+ DataStreamSource<Event> source =
testEnv.fromSource(
- sourceProvider.getSource(),
- WatermarkStrategy.noWatermarks(),
- PostgresDataSourceFactory.IDENTIFIER,
- new EventTypeInfo())
- .executeAndCollect();
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ PostgresDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo());
+
+ TypeSerializer<Event> serializer =
+
source.getTransformation().getOutputType().createSerializer(testEnv.getConfig());
+ CheckpointedCollectResultBuffer<Event> resultBuffer =
+ new CheckpointedCollectResultBuffer<>(serializer);
+ String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+ CollectResultIterator<Event> iterator =
+ addCollector(testEnv, source, resultBuffer, serializer,
accumulatorName);
- // Collect events and verify data
- List<Event> collectedEvents = new ArrayList<>();
- int expectedDataCount = 3; // We inserted 3 rows
- int dataCount = 0;
- int maxEvents = 10; // Safety limit
-
- while (events.hasNext() && collectedEvents.size() < maxEvents) {
- Event event = events.next();
- collectedEvents.add(event);
- if (event instanceof DataChangeEvent) {
- dataCount++;
- if (dataCount >= expectedDataCount) {
- break;
+ JobClient jobClient =
testEnv.executeAsync("testDatabaseNameWithHyphen");
+ iterator.setJobClient(jobClient);
+
+ try {
+ // Collect events and verify data
+ List<Event> collectedEvents = new ArrayList<>();
+ int expectedDataCount = 3; // We inserted 3 rows
+ int dataCount = 0;
+ int maxEvents = 10; // Safety limit
+
+ while (iterator.hasNext() && collectedEvents.size() < maxEvents) {
+ Event event = iterator.next();
+ collectedEvents.add(event);
+ if (event instanceof DataChangeEvent) {
+ dataCount++;
+ if (dataCount >= expectedDataCount) {
+ break;
+ }
}
}
- }
- events.close();
-
- // Verify we received CreateTableEvent and DataChangeEvents
- assertThat(collectedEvents).isNotEmpty();
- // Check for CreateTableEvent
- long createTableEventCount =
- collectedEvents.stream().filter(e -> e instanceof
CreateTableEvent).count();
- assertThat(createTableEventCount).isGreaterThanOrEqualTo(1);
+ // Verify we received CreateTableEvent and DataChangeEvents
+ assertThat(collectedEvents).isNotEmpty();
- // Check for DataChangeEvents (INSERT events from snapshot)
- List<DataChangeEvent> dataChangeEvents =
- collectedEvents.stream()
- .filter(e -> e instanceof DataChangeEvent)
- .map(e -> (DataChangeEvent) e)
- .collect(Collectors.toList());
+ // Check for CreateTableEvent
+ long createTableEventCount =
+ collectedEvents.stream().filter(e -> e instanceof
CreateTableEvent).count();
+ assertThat(createTableEventCount).isGreaterThanOrEqualTo(1);
- assertThat(dataChangeEvents).hasSize(expectedDataCount);
+ // Check for DataChangeEvents (INSERT events from snapshot)
+ List<DataChangeEvent> dataChangeEvents =
+ collectedEvents.stream()
+ .filter(e -> e instanceof DataChangeEvent)
+ .map(e -> (DataChangeEvent) e)
+ .collect(Collectors.toList());
- // Verify the table ID in events
- for (DataChangeEvent dce : dataChangeEvents) {
- assertThat(dce.tableId().getSchemaName()).isEqualTo("public");
- assertThat(dce.tableId().getTableName()).isEqualTo("test_table");
- }
+ assertThat(dataChangeEvents).hasSize(expectedDataCount);
- // Verify the data content - we should have 3 INSERT events with ids
1, 2, 3
- List<Integer> actualIds =
- dataChangeEvents.stream()
- .map(
- dce -> {
- RecordData after = dce.after();
- return after.getInt(0); // id column
- })
- .sorted()
- .collect(Collectors.toList());
- assertThat(actualIds).containsExactly(1, 2, 3);
+ // Verify the table ID in events
+ for (DataChangeEvent dce : dataChangeEvents) {
+ assertThat(dce.tableId().getSchemaName()).isEqualTo("public");
+
assertThat(dce.tableId().getTableName()).isEqualTo("test_table");
+ }
- // Cleanup - first drop replication slot, then terminate connections
and drop database
- try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
- Statement statement = connection.createStatement()) {
- // Drop replication slot first (it was created during CDC
connection)
- try {
- statement.execute(String.format("SELECT
pg_drop_replication_slot('%s')", slotName));
- } catch (SQLException e) {
- // Ignore if slot doesn't exist
- LOG.warn("Failed to drop replication slot: {}",
e.getMessage());
+ // Verify the data content - we should have 3 INSERT events with
ids 1, 2, 3
+ List<Integer> actualIds =
+ dataChangeEvents.stream()
+ .map(
+ dce -> {
+ RecordData after = dce.after();
+ return after.getInt(0); // id column
+ })
+ .sorted()
+ .collect(Collectors.toList());
+ assertThat(actualIds).containsExactly(1, 2, 3);
+ } finally {
+ // Cancel the job first to release the replication slot
+ iterator.close();
+ jobClient.cancel().get();
+
+ // Wait for the job to fully stop and release the replication slot
+ Thread.sleep(3000);
+
Review Comment:
The cleanup uses a fixed `Thread.sleep(3000)` to wait for the job to stop
and release the replication slot. This is brittle in CI (may be too short on
slow runners and adds unnecessary latency on fast ones). Prefer polling for the
job to reach a terminal status (e.g., CANCELED) and/or retrying
replication-slot drop until it succeeds or a deadline is reached.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java:
##########
@@ -646,85 +646,100 @@ public void testDatabaseNameWithHyphenEndToEnd() throws
Exception {
FlinkSourceProvider sourceProvider =
(FlinkSourceProvider) dataSource.getEventSourceProvider();
- CloseableIterator<Event> events =
+ DataStreamSource<Event> source =
testEnv.fromSource(
- sourceProvider.getSource(),
- WatermarkStrategy.noWatermarks(),
- PostgresDataSourceFactory.IDENTIFIER,
- new EventTypeInfo())
- .executeAndCollect();
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ PostgresDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo());
+
+ TypeSerializer<Event> serializer =
+
source.getTransformation().getOutputType().createSerializer(testEnv.getConfig());
+ CheckpointedCollectResultBuffer<Event> resultBuffer =
+ new CheckpointedCollectResultBuffer<>(serializer);
+ String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+ CollectResultIterator<Event> iterator =
+ addCollector(testEnv, source, resultBuffer, serializer,
accumulatorName);
- // Collect events and verify data
- List<Event> collectedEvents = new ArrayList<>();
- int expectedDataCount = 3; // We inserted 3 rows
- int dataCount = 0;
- int maxEvents = 10; // Safety limit
-
- while (events.hasNext() && collectedEvents.size() < maxEvents) {
- Event event = events.next();
- collectedEvents.add(event);
- if (event instanceof DataChangeEvent) {
- dataCount++;
- if (dataCount >= expectedDataCount) {
- break;
+ JobClient jobClient =
testEnv.executeAsync("testDatabaseNameWithHyphen");
+ iterator.setJobClient(jobClient);
+
+ try {
+ // Collect events and verify data
+ List<Event> collectedEvents = new ArrayList<>();
+ int expectedDataCount = 3; // We inserted 3 rows
+ int dataCount = 0;
+ int maxEvents = 10; // Safety limit
+
+ while (iterator.hasNext() && collectedEvents.size() < maxEvents) {
+ Event event = iterator.next();
+ collectedEvents.add(event);
+ if (event instanceof DataChangeEvent) {
+ dataCount++;
+ if (dataCount >= expectedDataCount) {
+ break;
+ }
}
}
- }
- events.close();
-
- // Verify we received CreateTableEvent and DataChangeEvents
- assertThat(collectedEvents).isNotEmpty();
- // Check for CreateTableEvent
- long createTableEventCount =
- collectedEvents.stream().filter(e -> e instanceof
CreateTableEvent).count();
- assertThat(createTableEventCount).isGreaterThanOrEqualTo(1);
+ // Verify we received CreateTableEvent and DataChangeEvents
+ assertThat(collectedEvents).isNotEmpty();
- // Check for DataChangeEvents (INSERT events from snapshot)
- List<DataChangeEvent> dataChangeEvents =
- collectedEvents.stream()
- .filter(e -> e instanceof DataChangeEvent)
- .map(e -> (DataChangeEvent) e)
- .collect(Collectors.toList());
+ // Check for CreateTableEvent
+ long createTableEventCount =
+ collectedEvents.stream().filter(e -> e instanceof
CreateTableEvent).count();
+ assertThat(createTableEventCount).isGreaterThanOrEqualTo(1);
- assertThat(dataChangeEvents).hasSize(expectedDataCount);
+ // Check for DataChangeEvents (INSERT events from snapshot)
+ List<DataChangeEvent> dataChangeEvents =
+ collectedEvents.stream()
+ .filter(e -> e instanceof DataChangeEvent)
+ .map(e -> (DataChangeEvent) e)
+ .collect(Collectors.toList());
- // Verify the table ID in events
- for (DataChangeEvent dce : dataChangeEvents) {
- assertThat(dce.tableId().getSchemaName()).isEqualTo("public");
- assertThat(dce.tableId().getTableName()).isEqualTo("test_table");
- }
+ assertThat(dataChangeEvents).hasSize(expectedDataCount);
- // Verify the data content - we should have 3 INSERT events with ids
1, 2, 3
- List<Integer> actualIds =
- dataChangeEvents.stream()
- .map(
- dce -> {
- RecordData after = dce.after();
- return after.getInt(0); // id column
- })
- .sorted()
- .collect(Collectors.toList());
- assertThat(actualIds).containsExactly(1, 2, 3);
+ // Verify the table ID in events
+ for (DataChangeEvent dce : dataChangeEvents) {
+ assertThat(dce.tableId().getSchemaName()).isEqualTo("public");
+
assertThat(dce.tableId().getTableName()).isEqualTo("test_table");
+ }
- // Cleanup - first drop replication slot, then terminate connections
and drop database
- try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
- Statement statement = connection.createStatement()) {
- // Drop replication slot first (it was created during CDC
connection)
- try {
- statement.execute(String.format("SELECT
pg_drop_replication_slot('%s')", slotName));
- } catch (SQLException e) {
- // Ignore if slot doesn't exist
- LOG.warn("Failed to drop replication slot: {}",
e.getMessage());
+ // Verify the data content - we should have 3 INSERT events with
ids 1, 2, 3
+ List<Integer> actualIds =
+ dataChangeEvents.stream()
+ .map(
+ dce -> {
+ RecordData after = dce.after();
+ return after.getInt(0); // id column
+ })
+ .sorted()
+ .collect(Collectors.toList());
+ assertThat(actualIds).containsExactly(1, 2, 3);
+ } finally {
+ // Cancel the job first to release the replication slot
+ iterator.close();
+ jobClient.cancel().get();
+
Review Comment:
If `jobClient.cancel().get()` throws (e.g., job already failed/terminated)
or blocks for a long time, the subsequent DB/slot cleanup won't run,
potentially leaking resources and causing later tests to flake. Consider
wrapping the cancel/await in its own try/catch (and ideally using a bounded
wait) so the JDBC cleanup always executes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]