Copilot commented on code in PR #64075:
URL: https://github.com/apache/doris/pull/64075#discussion_r3354055815
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java:
##########
@@ -150,6 +157,43 @@ private void
createSlotForGlobalStreamSplit(PostgresDialect postgresDialect) {
}
}
+ /** Create/ensure the Doris-owned publication for all include_tables
(idempotent, multi-BE safe). */
+ private void createPublicationForDorisOwned(
+ PostgresDialect dialect, Map<String, String> config, String jobId)
{
+ String pubName = resolvePublicationName(config, jobId);
+ String schema = config.get(DataSourceConfigKeys.SCHEMA);
+ String[] qualified = ConfigUtil.getTableList(schema, config);
+ if (qualified.length == 0) {
+ throw new CdcClientException("No tables to create publication " +
pubName);
+ }
+ String tableList =
+ Arrays.stream(qualified)
+ .map(q -> new TableId(null, schema,
q.substring(q.indexOf('.') + 1))
+ .toDoubleQuotedString())
+ .collect(Collectors.joining(", "));
+ // Mirrors debezium PostgresReplicationConnection#initPublication:
check existence, then
+ // CREATE ... FOR TABLE / ALTER ... SET TABLE (here always the full
include_tables set).
+ try (PostgresConnection conn = dialect.openJdbcConnection();
+ Statement stmt = conn.connection().createStatement()) {
+ long count;
+ try (ResultSet rs =
+ stmt.executeQuery(
+ "SELECT COUNT(1) FROM pg_publication WHERE pubname
= '" + pubName + "'")) {
+ rs.next();
+ count = rs.getLong(1);
+ }
+ if (count == 0) {
+ stmt.execute("CREATE PUBLICATION " + pubName + " FOR TABLE " +
tableList);
+ } else {
+ stmt.execute("ALTER PUBLICATION " + pubName + " SET TABLE " +
tableList);
+ }
Review Comment:
createPublicationForDorisOwned() is not actually “multi-BE safe”: the SELECT
COUNT + CREATE/ALTER sequence has a race where two BEs can both observe
count==0 and then one CREATE PUBLICATION fails with duplicate_object, aborting
reader initialization. This can make multi-BE jobs flaky (exactly the scenario
this PR is trying to stabilize). Prefer an atomic-ish approach: try CREATE
first and, on duplicate_object, fall back to ALTER SET TABLE (or always ALTER
after ensuring existence).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]