github-actions[bot] commented on code in PR #64423:
URL: https://github.com/apache/doris/pull/64423#discussion_r3427523755


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java:
##########
@@ -618,29 +682,42 @@ public void close(JobBaseConfig jobConfig) {
                     slotName,
                     pubName,
                     jobId);
-            return;
+            return true;
         }
-        try {
-            PostgresSourceConfig sourceConfig = getSourceConfig(jobConfig);
-            PostgresDialect dialect = new PostgresDialect(sourceConfig);
-            if (dropSlot) {
-                LOG.info("Dropping auto-created replication slot {} for job 
{}", slotName, jobId);
-                dialect.removeSlot(slotName);
-            } else {
-                LOG.info("Skipping drop of user-provided slot {} for job {}", 
slotName, jobId);
-            }
-            if (dropPub) {
-                LOG.info("Dropping auto-created publication {} for job {}", 
pubName, jobId);
-                try (PostgresConnection connection = 
dialect.openJdbcConnection()) {
-                    connection.execute("DROP PUBLICATION IF EXISTS " + 
pubName);
-                }
-            } else {
-                LOG.info(
-                        "Skipping drop of user-provided publication {} for job 
{}", pubName, jobId);
+        PostgresDialect dialect = new 
PostgresDialect(getSourceConfig(jobConfig));
+        if (dropPub) {
+            LOG.info("Dropping auto-created publication {} for job {}", 
pubName, jobId);
+            try (PostgresConnection connection = dialect.openJdbcConnection()) 
{
+                connection.execute("DROP PUBLICATION IF EXISTS " + pubName);
+            } catch (Exception ex) {
+                LOG.warn("Failed to drop publication {} for job {}: {}", 
pubName, jobId, ex.getMessage());
             }
+        }
+        if (!dropSlot) {
+            return true;
+        }
+        LOG.info("Dropping auto-created replication slot {} for job {}", 
slotName, jobId);
+        try {
+            dialect.removeSlot(slotName);
         } catch (Exception ex) {
-            LOG.warn(
-                    "Failed to clean up postgres resources for job {}: {}", 
jobId, ex.getMessage());
+            LOG.warn("Drop of replication slot {} for job {} failed: {}", 
slotName, jobId, ex.getMessage());
+        }
+        boolean stillHeld = slotExists(dialect, slotName);
+        if (stillHeld) {
+            LOG.warn("Replication slot {} for job {} still present after drop, 
will retry", slotName, jobId);
+        }
+        return !stillHeld;
+    }
+
+    private boolean slotExists(PostgresDialect dialect, String slotName) {
+        try (PostgresConnection connection = dialect.openJdbcConnection()) {
+            return connection.queryAndMap(
+                    "SELECT 1 FROM pg_replication_slots WHERE slot_name = '" + 
slotName + "'",
+                    rs -> rs.next());

Review Comment:
   `slotExists` treats any verification failure as "slot gone", so 
`releaseSourceResources()` returns true after a failed `removeSlot()` whenever 
this follow-up check cannot connect or execute (for example a transient PG 
outage, SSL/cert problem, or network partition). `/api/close` then removes the 
local context and FE has already deleted the job meta, but no 
`pendingSlotDrops` retry is scheduled, leaving the Doris-owned replication 
slot/source resource behind. Please only return success when the slot is 
definitively absent; on verification errors return `false` or otherwise 
schedule the retry so cleanup continues until the retry window expires.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to