Copilot commented on code in PR #63514: URL: https://github.com/apache/doris/pull/63514#discussion_r3287646879
########## regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset_restart_fe.groovy: ########## @@ -0,0 +1,184 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +// Mirror of test_streaming_mysql_job_special_offset_restart_fe for the PG path: +// CREATE JOB with a JSON LSN offset, sync, restart FE, verify currentOffset +// survives the replay and subsequent binlog DML still lands. +// +// PG-specific wrinkle: an auto-managed slot starts retaining WAL only at slot +// creation time, so a CREATE-with-past-LSN against an auto slot would fail +// because PG has already purged the requested LSN. We therefore pre-create a +// user-provided slot first — that pins the WAL retention horizon back in time +// far enough to make the LSN we capture valid. +suite("test_streaming_postgres_job_special_offset_restart_fe", + "docker,pg,external_docker,external_docker_pg,nondatalake") { + def jobName = "test_streaming_pg_special_offset_restart_fe" + def options = new ClusterOptions() + options.setFeNum(1) + options.cloudMode = null + + docker(options) { + def currentDb = (sql "select database()")[0][0] + def table1 = "special_offset_restart_pg_tbl" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + def userSlot = "special_offset_restart_slot" + def userPub = "special_offset_restart_pub" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + // Setup: fresh PG table + fresh user slot/pub. Slot must be created + // BEFORE the LSN we capture below, otherwise PG would have purged + // the WAL covering that LSN by the time the job tries to replay it. + def lsnAtCreate = "" + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} ( + "id" int PRIMARY KEY, + "name" varchar(100) + )""" + sql """DROP PUBLICATION IF EXISTS ${userPub}""" + sql """CREATE PUBLICATION ${userPub} FOR TABLE ${pgDB}.${pgSchema}.${table1}""" + def existing = sql """SELECT COUNT(1) FROM pg_replication_slots WHERE slot_name = '${userSlot}'""" + if (existing[0][0] != 0) { + sql """SELECT pg_drop_replication_slot('${userSlot}')""" + } + sql """SELECT pg_create_logical_replication_slot('${userSlot}', 'pgoutput')""" + + // Capture LSN AFTER slot creation, BEFORE the INSERTs the job will read. + def lsnRows = sql """SELECT pg_current_wal_lsn()::text""" + def lsnStr = lsnRows[0][0].toString() + def parts = lsnStr.split("/") + def high = Long.parseLong(parts[0], 16) + def low = Long.parseLong(parts[1], 16) + lsnAtCreate = String.valueOf((high << 32) + low) + log.info("CREATE LSN mark: ${lsnStr} -> numeric: ${lsnAtCreate}") Review Comment: LSN is a 64-bit unsigned value; converting via (high << 32) + low using Java/Groovy Long can overflow and produce a negative/incorrect decimal string on long-lived PG instances. Use BigInteger (as in test_streaming_postgres_job_slot_lsn_advance) or compute via BigInteger.shiftLeft(32).add(...) to keep the JSON offset stable. ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java: ########## @@ -923,19 +921,20 @@ protected abstract Map<TableId, TableChanges.TableChange> discoverTableSchemas( public void close(JobBaseConfig jobConfig) { LOG.info("Close source reader for job {}", jobConfig.getJobId()); - // Cancel any active poll operations - if (activePollFutures != null) { - activePollFutures.forEach(f -> f.cancel(true)); - activePollFutures.clear(); - activePollFutures = null; + // Cancel outside the lock so a thread blocked in anyOf.join() releases it. + List<CompletableFuture<PollResult>> activePolls = this.activePollFutures; + if (activePolls != null) { + for (CompletableFuture<PollResult> f : activePolls) { + f.cancel(true); + } } - // Clean up all readers - finishSplitRecords(); - - if (tableSchemas != null) { - tableSchemas.clear(); - tableSchemas = null; + synchronized (this) { + finishSplitRecords(); + if (tableSchemas != null) { + tableSchemas.clear(); + tableSchemas = null; + } } } Review Comment: pollExecutor is created per reader in initialize() but is never shut down in close()/finishSplitRecords(). If many CDC jobs are created/dropped in the cdc_client process, this can leak thread-pool threads and memory. Consider shutting down the executor (e.g., shutdownNow + awaitTermination) during close, after cancelling active futures. ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java: ########## @@ -1130,10 +1127,20 @@ private Map<TableId, TableChanges.TableChange> discoverTableSchemas(JobBaseConfi public void close(JobBaseConfig jobConfig) { LOG.info("Close source reader for job {}", jobConfig.getJobId()); - finishSplitRecords(); - if (tableSchemas != null) { - tableSchemas.clear(); - tableSchemas = null; + // Cancel outside the lock so a thread blocked in anyOf.join() releases it. + List<CompletableFuture<PollResult>> activePolls = this.activePollFutures; + if (activePolls != null) { + for (CompletableFuture<PollResult> f : activePolls) { + f.cancel(true); + } + } + + synchronized (this) { + finishSplitRecords(); + if (tableSchemas != null) { + tableSchemas.clear(); + tableSchemas = null; + } } Review Comment: pollExecutor is created per reader in initialize() but never shut down in close()/finishSplitRecords(). Over time, repeated job create/drop will leak threads and associated resources (even if daemon). Consider calling pollExecutor.shutdownNow() (and nulling it) during close, after cancelling futures, to fully release the thread pool. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
