github-actions[bot] commented on code in PR #64075:
URL: https://github.com/apache/doris/pull/64075#discussion_r3348288448
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java:
##########
@@ -150,6 +157,43 @@ private void
createSlotForGlobalStreamSplit(PostgresDialect postgresDialect) {
}
}
+ /** Create/ensure the Doris-owned publication for all include_tables
(idempotent, multi-BE safe). */
+ private void createPublicationForDorisOwned(
+ PostgresDialect dialect, Map<String, String> config, String jobId)
{
+ String pubName = resolvePublicationName(config, jobId);
+ String schema = config.get(DataSourceConfigKeys.SCHEMA);
+ String[] qualified = ConfigUtil.getTableList(schema, config);
+ if (qualified.length == 0) {
+ throw new CdcClientException("No tables to create publication " +
pubName);
+ }
+ String tableList =
+ Arrays.stream(qualified)
+ .map(q -> new TableId(null, schema,
q.substring(q.indexOf('.') + 1))
+ .toDoubleQuotedString())
+ .collect(Collectors.joining(", "));
+ // Mirrors debezium PostgresReplicationConnection#initPublication:
check existence, then
+ // CREATE ... FOR TABLE / ALTER ... SET TABLE (here always the full
include_tables set).
+ try (PostgresConnection conn = dialect.openJdbcConnection();
+ Statement stmt = conn.connection().createStatement()) {
+ long count;
+ try (ResultSet rs =
+ stmt.executeQuery(
+ "SELECT COUNT(1) FROM pg_publication WHERE pubname
= '" + pubName + "'")) {
+ rs.next();
+ count = rs.getLong(1);
+ }
+ if (count == 0) {
+ stmt.execute("CREATE PUBLICATION " + pubName + " FOR TABLE " +
tableList);
+ } else {
+ stmt.execute("ALTER PUBLICATION " + pubName + " SET TABLE " +
tableList);
Review Comment:
This manual `CREATE PUBLICATION` path drops Debezium's
`publish_via_partition_root` handling. The reader still calls
`configFactory.setIncludePartitionedTables(true)`, and the previous
`AutoCreateMode.FILTERED` implementation appended `WITH
(publish_via_partition_root = true)` on PostgreSQL 13+ when creating the
publication. Without that option, changes for a partitioned root table can be
published as the child partition relation instead of the configured root table,
so a from-to job that includes the partitioned parent may fail to route or
apply streaming changes correctly after the snapshot. Please preserve the
previous Debezium behavior here, including the server-version check, and
add/adjust coverage for a partitioned PostgreSQL table.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]