This is an automated email from the ASF dual-hosted git repository.
zclllyybb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9896994e9e9 [Fix](pyudf) clear Nereids UDF registry on drop database
(#62950)
9896994e9e9 is described below
commit 9896994e9e9b6ab2663d3464f3eb35b5ba2ddeb0
Author: linrrarity <[email protected]>
AuthorDate: Mon May 11 14:35:17 2026 +0800
[Fix](pyudf) clear Nereids UDF registry on drop database (#62950)
Nereids resolves UDF calls from FunctionRegistry, while SHOW FUNCTIONS
reads catalog
metadata. After DROP DATABASE and recreating the same database name,
catalog metadata
could be empty but FunctionRegistry still contained stale Python UDF
builders, causing
SELECT to bind and execute the old function body.
```sql
DROP DATABASE IF EXISTS registry_test_db
CREATE DATABASE registry_test_db;
USE registry_test_db;
DROP FUNCTION IF EXISTS py_exc_cache_test(INT);
CREATE FUNCTION py_exc_cache_test(INT)
RETURNS INT
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "evaluate",
"runtime_version" = "3.12.11",
"always_nullable" = "true"
)
AS $$
def evaluate(x):
if x is None:
return None
return x + 1
$$;
-- Normal operation
SELECT py_exc_cache_test(10); -- 11
-- Directly delete, but the code didn't clean up the FunctionRegistry under
db.
DROP DATABASE registry_test_db FORCE;
CREATE DATABASE registry_test_db;
USE registry_test_db;
-- show functions 走catalog的 db.getFunctions()
SHOW FUNCTIONS LIKE 'py_exc_cache_test';
-- empty
-- Function execution, go through FunctionRegistry, use the remaining
FunctionRegistry, expected to be 11 (bug)
SELECT py_exc_cache_test(10);
-- Create a new function with the same name, but the execution logic is to
append to the end of the Registry.
-- Normal call still goes through the previous version of x + 1 after drop
dp (bug)
DROP FUNCTION IF EXISTS py_exc_cache_test(INT);
CREATE FUNCTION py_exc_cache_test(INT)
RETURNS INT
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "evaluate",
"runtime_version" = "3.12.11",
"always_nullable" = "true"
)
AS $$
def evaluate(x):
if x is None:
return None
return x + 999
$$;
SELECT py_exc_cache_test(10); -- still 11 (bug)
```
---
.../apache/doris/catalog/CatalogRecycleBin.java | 7 +-
.../java/org/apache/doris/catalog/Database.java | 39 ++++++--
.../org/apache/doris/catalog/FunctionRegistry.java | 9 ++
.../apache/doris/datasource/InternalCatalog.java | 13 +++
.../doris/catalog/CatalogRecycleBinTest.java | 49 +++++++++-
.../suites/pythonudf_p0/test_pythonudf_drop.groovy | 105 +++++++++++++++++++++
6 files changed, 210 insertions(+), 12 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
index b60c8ba45e8..60af9c64591 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
@@ -1698,7 +1698,7 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable {
}
public void readFields(DataInput in) throws IOException {
- db = Database.read(in);
+ db = Database.readForRecycleBin(in);
int count = in.readInt();
for (int i = 0; i < count; i++) {
@@ -1710,7 +1710,10 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable {
long tableId = in.readLong();
tableIds.add(tableId);
}
- GsonUtils.GSON.fromJson(Text.readString(in),
RecycleDatabaseInfo.class);
+ // Consume legacy trailing json for stream compatibility.
+ // Do not deserialize it because nested Database.gsonPostProcess()
+ // would register functions from recycle bin into Nereids registry.
+ Text.readString(in);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
index 9afec0ab1bc..b9411d7731e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
@@ -85,6 +85,7 @@ public class Database extends MetaObject implements Writable,
DatabaseIf<Table>,
private static final Logger LOG = LogManager.getLogger(Database.class);
private static final String TRANSACTION_QUOTA_SIZE =
"transactionQuotaSize";
+ private static final ThreadLocal<Boolean> SKIP_REGISTER_NEREIDS_FUNCTIONS
= new ThreadLocal<>();
@SerializedName(value = "id")
private long id;
@@ -630,10 +631,28 @@ public class Database extends MetaObject implements
Writable, DatabaseIf<Table>,
}
public static Database read(DataInput in) throws IOException {
+ return read(in, false);
+ }
+
+ public static Database readForRecycleBin(DataInput in) throws IOException {
+ return read(in, true);
+ }
+
+ private static Database read(DataInput in, boolean
skipRegisterNereidsFunctions) throws IOException {
LOG.info("read db from journal {}", in);
- Database db = GsonUtils.GSON.fromJson(Text.readString(in),
Database.class);
- db.readTables(in);
- return db;
+ Boolean previous = SKIP_REGISTER_NEREIDS_FUNCTIONS.get();
+ SKIP_REGISTER_NEREIDS_FUNCTIONS.set(skipRegisterNereidsFunctions);
+ try {
+ Database db = GsonUtils.GSON.fromJson(Text.readString(in),
Database.class);
+ db.readTables(in);
+ return db;
+ } finally {
+ if (previous == null) {
+ SKIP_REGISTER_NEREIDS_FUNCTIONS.remove();
+ } else {
+ SKIP_REGISTER_NEREIDS_FUNCTIONS.set(previous);
+ }
+ }
}
private void writeTables(DataOutput out) throws IOException {
@@ -665,12 +684,14 @@ public class Database extends MetaObject implements
Writable, DatabaseIf<Table>,
transactionQuotaSize = Long.parseLong(txnQuotaStr);
binlogConfig = dbProperties.getBinlogConfig();
- for (ImmutableList<Function> functions : name2Function.values()) {
- for (Function function : functions) {
- try {
- FunctionUtil.translateToNereids(this.getFullName(),
function);
- } catch (Exception e) {
- LOG.warn("Nereids add function failed", e);
+ if (!Boolean.TRUE.equals(SKIP_REGISTER_NEREIDS_FUNCTIONS.get())) {
+ for (ImmutableList<Function> functions : name2Function.values()) {
+ for (Function function : functions) {
+ try {
+ FunctionUtil.translateToNereids(this.getFullName(),
function);
+ } catch (Exception e) {
+ LOG.warn("Nereids add function failed", e);
+ }
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionRegistry.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionRegistry.java
index 619de51e50d..1a8402520ef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionRegistry.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionRegistry.java
@@ -322,6 +322,15 @@ public class FunctionRegistry {
}
}
+ public void dropUdfByDb(String dbName) {
+ if (dbName == null) {
+ dbName = GLOBAL_FUNCTION;
+ }
+ synchronized (name2UdfBuilders) {
+ name2UdfBuilders.remove(dbName);
+ }
+ }
+
/**
* use for search appropriate signature for UDFs if candidate more than
one.
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 5ed2a2edde3..ccae5a6abb5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -42,6 +42,8 @@ import
org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
import org.apache.doris.catalog.DynamicPartitionProperty;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EnvFactory;
+import org.apache.doris.catalog.Function;
+import org.apache.doris.catalog.FunctionUtil;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.InfoSchemaDb;
@@ -537,6 +539,7 @@ public class InternalCatalog implements CatalogIf<Database>
{
// 3. remove db from catalog
idToDb.remove(db.getId());
fullNameToDb.remove(db.getFullName());
+
Env.getCurrentEnv().getFunctionRegistry().dropUdfByDb(db.getFullName());
DropDbInfo info = new DropDbInfo(dbName, force, recycleTime);
Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentEnv().getCurrentCatalog().getId(),
db.getId());
Env.getCurrentEnv().getDictionaryManager().dropDbDictionaries(dbName);
@@ -595,6 +598,7 @@ public class InternalCatalog implements CatalogIf<Database>
{
fullNameToDb.remove(dbName);
idToDb.remove(db.getId());
+ Env.getCurrentEnv().getFunctionRegistry().dropUdfByDb(dbName);
} finally {
unlock();
}
@@ -644,6 +648,7 @@ public class InternalCatalog implements CatalogIf<Database>
{
RecoverInfo recoverInfo = new RecoverInfo(db.getId(), -1L, -1L,
newDbName, "", "", "", "");
Env.getCurrentEnv().getEditLog().logRecoverDb(recoverInfo);
db.unmarkDropped();
+ registerDbFunctionsToNereids(db);
} finally {
MetaLockUtils.writeUnlockTables(tableList);
db.writeUnlock();
@@ -726,9 +731,17 @@ public class InternalCatalog implements
CatalogIf<Database> {
// add db to catalog
replayCreateDb(db, newDbName);
db.unmarkDropped();
+ registerDbFunctionsToNereids(db);
LOG.info("replay recover db[{}]", dbId);
}
+ private void registerDbFunctionsToNereids(Database db) {
+ // A recovered database reuses catalog Function objects, so rebuild
their Nereids builders.
+ for (Function function : db.getFunctions()) {
+ FunctionUtil.translateToNereids(db.getFullName(), function);
+ }
+ }
+
public void alterDatabaseQuota(String dbName, QuotaType quotaType, long
quotaValue) throws DdlException {
Database db = getDbOrDdlException(dbName);
db.writeLockOrDdlException();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogRecycleBinTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogRecycleBinTest.java
index 918bb857c34..99831c65578 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogRecycleBinTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogRecycleBinTest.java
@@ -17,12 +17,16 @@
package org.apache.doris.catalog;
+import org.apache.doris.catalog.Function.BinaryType;
+import org.apache.doris.catalog.Function.NullableMode;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.MaterializedIndex.IndexState;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.util.URI;
+import org.apache.doris.nereids.trees.expressions.functions.FunctionBuilder;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.utframe.UtFrameUtils;
@@ -34,6 +38,10 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
@@ -166,6 +174,46 @@ public class CatalogRecycleBinTest {
Assert.assertNull(recycleBin.getRecycleTimeById(CatalogTestUtil.testDbId1));
}
+ @Test
+ public void testReadRecycleBinDatabaseDoesNotRegisterNereidsFunctions()
throws Exception {
+ CatalogRecycleBin recycleBin = Env.getCurrentRecycleBin();
+ String dbName = "recycle_db_with_python_udf";
+ String functionName = "recycled_py_udf";
+ Database db = new Database(10001, dbName);
+ ScalarFunction function = ScalarFunction.createUdf(
+ BinaryType.PYTHON_UDF,
+ new FunctionName(dbName, functionName),
+ new Type[] {Type.INT},
+ Type.INT,
+ false,
+ URI.create("file:///tmp/recycled_py_udf.py"),
+ "evaluate",
+ null,
+ null);
+ function.setRuntimeVersion("3.8");
+ function.setFunctionCode("def evaluate(x):\n return x + 1\n");
+ function.setNullableMode(NullableMode.ALWAYS_NULLABLE);
+ function.setId(10002);
+ db.replayAddFunction(function);
+ Assert.assertTrue(hasUdfBuilder(dbName, functionName));
+
+ Assert.assertTrue(recycleBin.recycleDatabase(db, Sets.newHashSet(),
Sets.newHashSet(), false, false, 0));
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ recycleBin.write(new DataOutputStream(outputStream));
+
+ Env.getCurrentEnv().getFunctionRegistry().dropUdfByDb(dbName);
+ Assert.assertFalse(hasUdfBuilder(dbName, functionName));
+
+ CatalogRecycleBin.read(new DataInputStream(new
ByteArrayInputStream(outputStream.toByteArray())));
+ Assert.assertFalse(hasUdfBuilder(dbName, functionName));
+ }
+
+ private boolean hasUdfBuilder(String dbName, String functionName) {
+ Map<String, List<FunctionBuilder>> buildersByName =
+
Env.getCurrentEnv().getFunctionRegistry().getName2UdfBuilders().get(dbName);
+ return buildersByName != null &&
buildersByName.containsKey(functionName);
+ }
+
@Test
public void testRecycleTable() {
CatalogRecycleBin recycleBin = Env.getCurrentRecycleBin();
@@ -1011,4 +1059,3 @@ public class CatalogRecycleBinTest {
}
}
}
-
diff --git a/regression-test/suites/pythonudf_p0/test_pythonudf_drop.groovy
b/regression-test/suites/pythonudf_p0/test_pythonudf_drop.groovy
index 3c2c5f9258b..9ce214c5930 100644
--- a/regression-test/suites/pythonudf_p0/test_pythonudf_drop.groovy
+++ b/regression-test/suites/pythonudf_p0/test_pythonudf_drop.groovy
@@ -124,10 +124,115 @@ suite("test_pythonudf_drop", "nonConcurrent") {
qt_py_udf_drop_5 """SELECT py_drop_reconnect(32);"""
try_sql("DROP FUNCTION IF EXISTS py_drop_reconnect(INT);")
+
+ // Case 4: recreating the same signature must use the new inline
function body.
+ sql """DROP FUNCTION IF EXISTS py_drop_recreate(INT)"""
+ sql """
+ CREATE FUNCTION py_drop_recreate(INT)
+ RETURNS INT
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "symbol" = "evaluate",
+ "runtime_version" = "${runtime_version}",
+ "always_nullable" = "true"
+ )
+ AS \$\$
+def evaluate(x):
+ if x is None:
+ return None
+ return x + 1
+\$\$
+ """
+ def recreateOldResult = sql """SELECT py_drop_recreate(10);"""
+ assert recreateOldResult[0][0] == 11
+
+ sql """DROP FUNCTION IF EXISTS py_drop_recreate(INT)"""
+ sql """
+ CREATE FUNCTION py_drop_recreate(INT)
+ RETURNS INT
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "symbol" = "evaluate",
+ "runtime_version" = "${runtime_version}",
+ "always_nullable" = "true"
+ )
+ AS \$\$
+def evaluate(x):
+ if x is None:
+ return None
+ return x + 999
+\$\$
+ """
+ def recreateNewResult = sql """SELECT py_drop_recreate(10);"""
+ assert recreateNewResult[0][0] == 1009
+ sql """DROP FUNCTION IF EXISTS py_drop_recreate(INT)"""
+
+ // Case 5: dropping a database must also clear Nereids UDF registry.
+ // SHOW FUNCTIONS reads catalog metadata, while SELECT resolves from
FunctionRegistry.
+ // Without registry cleanup, SELECT could still bind the stale x + 1
inline UDF
+ // after the database had been dropped and recreated.
+ def originalDb = sql("SELECT DATABASE()")[0][0]
+ def registryDb = "${originalDb}_registry_cleanup"
+ try {
+ sql """DROP DATABASE IF EXISTS ${registryDb} FORCE"""
+ sql """CREATE DATABASE ${registryDb}"""
+ sql """USE ${registryDb}"""
+ sql """
+ CREATE FUNCTION py_drop_db_registry(INT)
+ RETURNS INT
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "symbol" = "evaluate",
+ "runtime_version" = "${runtime_version}",
+ "always_nullable" = "true"
+ )
+ AS \$\$
+def evaluate(x):
+ if x is None:
+ return None
+ return x + 1
+\$\$
+ """
+ def oldResult = sql """SELECT py_drop_db_registry(10);"""
+ assert oldResult[0][0] == 11
+
+ sql """DROP DATABASE ${registryDb} FORCE"""
+ sql """CREATE DATABASE ${registryDb}"""
+ sql """USE ${registryDb}"""
+ def functions = sql """SHOW FUNCTIONS LIKE 'py_drop_db_registry'"""
+ assert functions.isEmpty()
+ test {
+ sql """SELECT py_drop_db_registry(10);"""
+ exception "Can not found function"
+ }
+
+ sql """
+ CREATE FUNCTION py_drop_db_registry(INT)
+ RETURNS INT
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "symbol" = "evaluate",
+ "runtime_version" = "${runtime_version}",
+ "always_nullable" = "true"
+ )
+ AS \$\$
+def evaluate(x):
+ if x is None:
+ return None
+ return x + 999
+\$\$
+ """
+ def rebuiltResult = sql """SELECT py_drop_db_registry(10);"""
+ assert rebuiltResult[0][0] == 1009
+ } finally {
+ sql """USE ${originalDb}"""
+ try_sql("DROP DATABASE IF EXISTS ${registryDb} FORCE")
+ }
} finally {
try_sql("DROP FUNCTION IF EXISTS py_drop_once(INT);")
try_sql("DROP FUNCTION IF EXISTS py_drop_a(INT);")
try_sql("DROP FUNCTION IF EXISTS py_drop_b(INT);")
try_sql("DROP FUNCTION IF EXISTS py_drop_reconnect(INT);")
+ try_sql("DROP FUNCTION IF EXISTS py_drop_recreate(INT);")
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]