lvyanquan commented on code in PR #4194:
URL: https://github.com/apache/flink-cdc/pull/4194#discussion_r2642635364
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java:
##########
@@ -135,6 +135,127 @@ public void testInitialStartupMode() throws Exception {
assertThat(inventoryDatabase.checkSlot(slotName)).isEqualTo(slotName);
}
+ @Test
+ public void testLatestOffsetStartupMode() throws Exception {
+ inventoryDatabase.createAndInitialize();
+ PostgresSourceConfigFactory configFactory =
+ (PostgresSourceConfigFactory)
+ new PostgresSourceConfigFactory()
+ .hostname(POSTGRES_CONTAINER.getHost())
+
.port(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+ .username(TEST_USER)
+ .password(TEST_PASSWORD)
+
.databaseList(inventoryDatabase.getDatabaseName())
+ .tableList("inventory.products")
+ .startupOptions(StartupOptions.latest())
+ .serverTimeZone("UTC");
+ configFactory.database(inventoryDatabase.getDatabaseName());
+ configFactory.slotName(slotName);
+ configFactory.decodingPluginName("pgoutput");
+
+ FlinkSourceProvider sourceProvider =
+ (FlinkSourceProvider)
+ new
PostgresDataSource(configFactory).getEventSourceProvider();
+ CloseableIterator<Event> events =
+ env.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ PostgresDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo())
+ .executeAndCollect();
+ Thread.sleep(10_000);
+ TableId tableId = TableId.tableId("inventory", "products");
+ CreateTableEvent createTableEvent =
getProductsCreateTableEvent(tableId);
+
+ List<Event> expectedBinlog = new ArrayList<>();
+ try (Connection connection =
+ getJdbcConnection(POSTGRES_CONTAINER,
inventoryDatabase.getDatabaseName());
+ Statement statement = connection.createStatement()) {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.VARCHAR(255).notNull(),
+ DataTypes.VARCHAR(512),
+ DataTypes.DOUBLE()
+ },
+ new String[] {"id", "name", "description",
"weight"});
+ BinaryRecordDataGenerator generator = new
BinaryRecordDataGenerator(rowType);
+
+ // Insert new data
+ statement.execute(
+ String.format(
+ "INSERT INTO inventory.products (name,
description, weight) VALUES ('scooter', 'New scooter', 5.5);",
+ inventoryDatabase.getDatabaseName()));
+ expectedBinlog.add(
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ 110, // next id after initialization
+ BinaryStringData.fromString("scooter"),
+ BinaryStringData.fromString("New
scooter"),
+ 5.5
+ })));
+
+ statement.execute(
+ String.format(
+ "INSERT INTO inventory.products (name,
description, weight) VALUES ('football', 'New football', 6.6);",
+ inventoryDatabase.getDatabaseName()));
+ expectedBinlog.add(
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ 111, // next id after initialization
+
BinaryStringData.fromString("football"),
+ BinaryStringData.fromString("New
football"),
+ 6.6
+ })));
+
+ // Update existing data
+ statement.execute(
+ String.format(
+ "UPDATE inventory.products SET description =
'Updated description' WHERE id = 101;"));
+ expectedBinlog.add(
+ DataChangeEvent.updateEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ 101,
+ BinaryStringData.fromString("scooter"),
+ BinaryStringData.fromString("Small
2-wheel scooter"),
+ 3.14
+ }),
+ generator.generate(
+ new Object[] {
+ 101,
+ BinaryStringData.fromString("scooter"),
+ BinaryStringData.fromString("Updated
description"),
+ 3.14
+ })));
+
+ // Wait for the events to be processed
+ Thread.sleep(5_000);
+ }
+
+ // Collect the actual events
+ List<Event> actualEvents =
+ fetchResultsExcept(events, expectedBinlog.size(),
createTableEvent);
+
+ // Filter out schema change events and keep only data change events
+ List<Event> actualDataChangeEvents =
+ actualEvents.stream()
+ .filter(event -> event instanceof DataChangeEvent)
+ .collect(Collectors.toList());
+
+ // Verify that we captured the expected number of data change events
+
assertThat(actualDataChangeEvents.size()).isGreaterThanOrEqualTo(expectedBinlog.size());
Review Comment:
It's better to add a scenario for creating a savepoint and restoring reads
from the savepoint here too.
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java:
##########
@@ -160,6 +161,8 @@ protected void emitElement(SourceRecord element,
SourceOutput<T> output) throws
debeziumDeserializationSchema.deserialize(element, outputCollector);
}
+ public void applySplit(SourceSplitBase split) {}
Review Comment:
It's better to add some java doc for this newly added public method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]