github-actions[bot] commented on code in PR #64075:
URL: https://github.com/apache/doris/pull/64075#discussion_r3348288448


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java:
##########
@@ -150,6 +157,43 @@ private void 
createSlotForGlobalStreamSplit(PostgresDialect postgresDialect) {
         }
     }
 
+    /** Create/ensure the Doris-owned publication for all include_tables 
(idempotent, multi-BE safe). */
+    private void createPublicationForDorisOwned(
+            PostgresDialect dialect, Map<String, String> config, String jobId) 
{
+        String pubName = resolvePublicationName(config, jobId);
+        String schema = config.get(DataSourceConfigKeys.SCHEMA);
+        String[] qualified = ConfigUtil.getTableList(schema, config);
+        if (qualified.length == 0) {
+            throw new CdcClientException("No tables to create publication " + 
pubName);
+        }
+        String tableList =
+                Arrays.stream(qualified)
+                        .map(q -> new TableId(null, schema, 
q.substring(q.indexOf('.') + 1))
+                                .toDoubleQuotedString())
+                        .collect(Collectors.joining(", "));
+        // Mirrors debezium PostgresReplicationConnection#initPublication: 
check existence, then
+        // CREATE ... FOR TABLE / ALTER ... SET TABLE (here always the full 
include_tables set).
+        try (PostgresConnection conn = dialect.openJdbcConnection();
+                Statement stmt = conn.connection().createStatement()) {
+            long count;
+            try (ResultSet rs =
+                    stmt.executeQuery(
+                            "SELECT COUNT(1) FROM pg_publication WHERE pubname 
= '" + pubName + "'")) {
+                rs.next();
+                count = rs.getLong(1);
+            }
+            if (count == 0) {
+                stmt.execute("CREATE PUBLICATION " + pubName + " FOR TABLE " + 
tableList);
+            } else {
+                stmt.execute("ALTER PUBLICATION " + pubName + " SET TABLE " + 
tableList);

Review Comment:
   This manual `CREATE PUBLICATION` path drops Debezium's 
`publish_via_partition_root` handling. The reader still calls 
`configFactory.setIncludePartitionedTables(true)`, and the previous 
`AutoCreateMode.FILTERED` implementation appended `WITH 
(publish_via_partition_root = true)` on PostgreSQL 13+ when creating the 
publication. Without that option, changes for a partitioned root table can be 
published as the child partition relation instead of the configured root table, 
so a from-to job that includes the partitioned parent may fail to route or 
apply streaming changes correctly after the snapshot. Please preserve the 
previous Debezium behavior here, including the server-version check, and 
add/adjust coverage for a partitioned PostgreSQL table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to