This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 68db939d9bc branch-3.1: [feat](bi) support default_init_catalog user
property for external catalog #49658 #54234 (#54232)
68db939d9bc is described below
commit 68db939d9bcc3d17fd62f7adc666e0dc77174289
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Sun Aug 3 19:03:14 2025 -0700
branch-3.1: [feat](bi) support default_init_catalog user property for
external catalog #49658 #54234 (#54232)
bp #49658 #54234
---------
Co-authored-by: camby <[email protected]>
---
.../java/org/apache/doris/mysql/MysqlProto.java | 16 ++++++++
.../org/apache/doris/mysql/privilege/Auth.java | 9 +++++
.../mysql/privilege/CommonUserProperties.java | 12 ++++++
.../apache/doris/mysql/privilege/UserProperty.java | 21 ++++++++++
.../doris/mysql/privilege/UserPropertyMgr.java | 9 +++++
.../org/apache/doris/catalog/UserPropertyTest.java | 44 +++++++++++++++++++++
.../org/apache/doris/mysql/MysqlProtoTest.java | 45 +++++++++++++++++++++-
.../apache/doris/planner/ResourceTagQueryTest.java | 2 +-
8 files changed, 156 insertions(+), 2 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java
b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java
index a672a217a33..7fed32d1aca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Strings;
@@ -206,6 +207,21 @@ public class MysqlProto {
return false;
}
+ // try to change catalog, if default_init_catalog inside user property
is not 'internal'
+ try {
+ String userInitCatalog =
Env.getCurrentEnv().getAuth().getInitCatalog(context.getQualifiedUser());
+ if (userInitCatalog != null &&
!userInitCatalog.equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
+ CatalogIf catalogIf =
context.getEnv().getCatalogMgr().getCatalog(userInitCatalog);
+ if (catalogIf != null) {
+ context.getEnv().changeCatalog(context, userInitCatalog);
+ }
+ }
+ } catch (DdlException e) {
+ context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
+ sendResponsePacket(context);
+ return false;
+ }
+
// set database
String db = authPacket.getDb();
if (!Strings.isNullOrEmpty(db)) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java
b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java
index 1ad3289803c..e1b0a972a7f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java
@@ -1221,6 +1221,15 @@ public class Auth implements Writable {
}
}
+ public String getInitCatalog(String qualifiedUser) {
+ readLock();
+ try {
+ return propertyMgr.getInitCatalog(qualifiedUser);
+ } finally {
+ readUnlock();
+ }
+ }
+
public String getWorkloadGroup(String qualifiedUser) {
readLock();
try {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java
index db838a91a56..12ed8cea083 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java
@@ -19,6 +19,7 @@ package org.apache.doris.mysql.privilege;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
+import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.resource.Tag;
@@ -66,6 +67,9 @@ public class CommonUserProperties implements Writable,
GsonPostProcessable {
@SerializedName(value = "it", alternate = {"insertTimeout"})
private int insertTimeout = -1;
+ @SerializedName(value = "ic")
+ private String initCatalog = InternalCatalog.INTERNAL_CATALOG_NAME;
+
@SerializedName(value = "wg", alternate = {"workloadGroup"})
private String workloadGroup = WorkloadGroupMgr.DEFAULT_GROUP_NAME;
@@ -156,6 +160,14 @@ public class CommonUserProperties implements Writable,
GsonPostProcessable {
this.insertTimeout = insertTimeout;
}
+ public String getInitCatalog() {
+ return initCatalog;
+ }
+
+ public void setInitCatalog(String initCatalog) {
+ this.initCatalog = initCatalog;
+ }
+
public String getWorkloadGroup() {
return workloadGroup;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java
b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java
index 58a8c2f7ef9..b79530af6f5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java
@@ -31,6 +31,7 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
+import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.load.DppConfig;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.resource.Tag;
@@ -78,6 +79,7 @@ public class UserProperty implements Writable {
public static final String PROP_QUOTA = "quota";
+ public static final String PROP_DEFAULT_INIT_CATALOG =
"default_init_catalog";
public static final String PROP_WORKLOAD_GROUP = "default_workload_group";
public static final String DEFAULT_CLOUD_CLUSTER = "default_cloud_cluster";
@@ -126,6 +128,7 @@ public class UserProperty implements Writable {
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_USER_INSERT_TIMEOUT
+ "$", Pattern.CASE_INSENSITIVE));
COMMON_PROPERTIES.add(Pattern.compile("^" + PROP_QUOTA + ".",
Pattern.CASE_INSENSITIVE));
+ COMMON_PROPERTIES.add(Pattern.compile("^" + PROP_DEFAULT_INIT_CATALOG
+ "$", Pattern.CASE_INSENSITIVE));
COMMON_PROPERTIES.add(Pattern.compile("^" + PROP_WORKLOAD_GROUP + "$",
Pattern.CASE_INSENSITIVE));
COMMON_PROPERTIES.add(Pattern.compile("^" + DEFAULT_CLOUD_CLUSTER +
"$", Pattern.CASE_INSENSITIVE));
COMMON_PROPERTIES.add(Pattern.compile("^" + DEFAULT_COMPUTE_GROUP +
"$", Pattern.CASE_INSENSITIVE));
@@ -170,6 +173,10 @@ public class UserProperty implements Writable {
return commonProperties.getCpuResourceLimit();
}
+ public String getInitCatalog() {
+ return commonProperties.getInitCatalog();
+ }
+
public String getWorkloadGroup() {
return commonProperties.getWorkloadGroup();
}
@@ -202,6 +209,7 @@ public class UserProperty implements Writable {
long execMemLimit = this.commonProperties.getExecMemLimit();
int queryTimeout = this.commonProperties.getQueryTimeout();
int insertTimeout = this.commonProperties.getInsertTimeout();
+ String initCatalog = this.commonProperties.getInitCatalog();
String workloadGroup = this.commonProperties.getWorkloadGroup();
String newDefaultCloudCluster = defaultCloudCluster;
@@ -317,6 +325,15 @@ public class UserProperty implements Writable {
} catch (NumberFormatException e) {
throw new DdlException(PROP_USER_INSERT_TIMEOUT + " is not
number");
}
+ } else if (keyArr[0].equalsIgnoreCase(PROP_DEFAULT_INIT_CATALOG)) {
+ if (keyArr.length != 1) {
+ throw new DdlException(PROP_DEFAULT_INIT_CATALOG + "
format error");
+ }
+ CatalogIf catalog =
Env.getCurrentEnv().getCatalogMgr().getCatalog(value);
+ if (catalog == null) {
+ throw new DdlException("catalog " + value + " not exists");
+ }
+ initCatalog = value;
} else if (keyArr[0].equalsIgnoreCase(PROP_WORKLOAD_GROUP)) {
if (keyArr.length != 1) {
throw new DdlException(PROP_WORKLOAD_GROUP + " format
error");
@@ -348,6 +365,7 @@ public class UserProperty implements Writable {
this.commonProperties.setExecMemLimit(execMemLimit);
this.commonProperties.setQueryTimeout(queryTimeout);
this.commonProperties.setInsertTimeout(insertTimeout);
+ this.commonProperties.setInitCatalog(initCatalog);
this.commonProperties.setWorkloadGroup(workloadGroup);
defaultCloudCluster = newDefaultCloudCluster;
}
@@ -441,6 +459,9 @@ public class UserProperty implements Writable {
// resource tag
result.add(Lists.newArrayList(PROP_RESOURCE_TAGS, Joiner.on(",
").join(commonProperties.getResourceTags())));
+ // init catalog
+ result.add(Lists.newArrayList(PROP_DEFAULT_INIT_CATALOG,
String.valueOf(commonProperties.getInitCatalog())));
+
result.add(Lists.newArrayList(PROP_WORKLOAD_GROUP,
String.valueOf(commonProperties.getWorkloadGroup())));
// default cloud cluster
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java
index 703e157c786..d34dbb9aeae 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java
@@ -214,6 +214,15 @@ public class UserPropertyMgr implements Writable {
return existProperty.getExecMemLimit();
}
+ public String getInitCatalog(String qualifiedUser) {
+ UserProperty existProperty = propertyMap.get(qualifiedUser);
+ existProperty = getPropertyIfNull(qualifiedUser, existProperty);
+ if (existProperty == null) {
+ return null;
+ }
+ return existProperty.getInitCatalog();
+ }
+
public String getWorkloadGroup(String qualifiedUser) {
UserProperty existProperty = propertyMap.get(qualifiedUser);
existProperty = getPropertyIfNull(qualifiedUser, existProperty);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/UserPropertyTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/UserPropertyTest.java
index 75d810c9f7e..6afe9b816b1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/UserPropertyTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/UserPropertyTest.java
@@ -22,6 +22,9 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.CatalogMgr;
+import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.mysql.privilege.UserProperty;
import com.google.common.collect.Lists;
@@ -94,6 +97,7 @@ public class UserPropertyTest {
DataInputStream inputStream = new DataInputStream(new
ByteArrayInputStream(byteStream.toByteArray()));
UserProperty newProperty = UserProperty.read(inputStream);
Assert.assertEquals(qualifiedUser, newProperty.getQualifiedUser());
+ Assert.assertEquals(property.getInitCatalog(), "internal");
}
@Test
@@ -165,4 +169,44 @@ public class UserPropertyTest {
}
Assert.assertEquals(-1, userProperty.getCpuResourceLimit());
}
+
+ @Test
+ public void testUpdateInitCatalog(@Mocked Env env, @Mocked CatalogMgr
catalogMgr) throws UserException {
+ CatalogIf internalCatalog = new InternalCatalog();
+ new Expectations() {
+ {
+ Env.getCurrentEnv();
+ minTimes = 1;
+ result = env;
+
+ env.getCatalogMgr();
+ minTimes = 1;
+ result = catalogMgr;
+
+ catalogMgr.getCatalog(anyString);
+ minTimes = 2;
+ result = null;
+ result = internalCatalog;
+ }
+ };
+
+ // for non exist catalog, use internal
+ List<Pair<String, String>> properties = Lists.newArrayList();
+ properties.add(Pair.of("default_init_catalog", "non_exist_catalog"));
+ UserProperty userProperty = new UserProperty();
+ try {
+ userProperty.update(properties);
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("not exists"));
+ }
+ Assert.assertEquals("internal", userProperty.getInitCatalog());
+
+ // for exist catalog, use it directly
+ properties = Lists.newArrayList();
+ properties.add(Pair.of("default_init_catalog", "exist_catalog"));
+ userProperty = new UserProperty();
+ userProperty.update(properties);
+ Assert.assertEquals("exist_catalog", userProperty.getInitCatalog());
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/mysql/MysqlProtoTest.java
b/fe/fe-core/src/test/java/org/apache/doris/mysql/MysqlProtoTest.java
index 07398032e86..0142b285dd7 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/mysql/MysqlProtoTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/mysql/MysqlProtoTest.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.AuthenticationException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
+import org.apache.doris.datasource.CatalogMgr;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.mysql.authenticate.AuthenticateRequest;
import org.apache.doris.mysql.authenticate.AuthenticatorManager;
@@ -37,6 +38,7 @@ import org.apache.doris.qe.ConnectContext;
import mockit.Delegate;
import mockit.Expectations;
import mockit.Mocked;
+import mockit.Verifications;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -249,6 +251,28 @@ public class MysqlProtoTest {
};
}
+ private void mockInitCatalog(@Mocked CatalogMgr catalogMgr, ConnectContext
context, String initCatalog)
+ throws Exception {
+ new Expectations() {
+ {
+ auth.getInitCatalog(anyString);
+ minTimes = 0;
+ result = initCatalog;
+
+ env.getCatalogMgr();
+ minTimes = 0;
+ result = catalogMgr;
+
+ catalogMgr.getCatalog(anyString);
+ minTimes = 0;
+ result = catalog;
+
+ env.changeCatalog(context, anyString);
+ minTimes = 0;
+ }
+ };
+ }
+
@Test
public void testNegotiate() throws Exception {
mockChannel("user", true);
@@ -260,6 +284,26 @@ public class MysqlProtoTest {
Assert.assertTrue(MysqlProto.negotiate(context));
}
+ @Test
+ public void testNegotiateInitCatalog(@Mocked CatalogMgr catalogMgr) throws
Exception {
+ mockChannel("user", true);
+ mockPassword(true);
+ mockAccess();
+ ConnectContext context = new ConnectContext(streamConnection);
+ context.setEnv(env);
+ context.setThreadLocalInfo();
+ String initCatalog = "external_catalog";
+ mockInitCatalog(catalogMgr, context, initCatalog);
+ Assert.assertTrue(MysqlProto.negotiate(context));
+
+ new Verifications() {
+ {
+ env.changeCatalog(context, initCatalog);
+ times = 1;
+ }
+ };
+ }
+
@Test
public void testNegotiateSendFail() throws Exception {
mockChannel("user", false);
@@ -346,5 +390,4 @@ public class MysqlProtoTest {
Assert.assertEquals("i have dream", new
String(MysqlProto.readNulTerminateString(buffer)));
Assert.assertEquals("you have dream too", new
String(MysqlProto.readEofString(buffer)));
}
-
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java
index 207bddae1b3..850d8b27b06 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java
@@ -280,7 +280,7 @@ public class ResourceTagQueryTest {
Assert.assertEquals(1000000, execMemLimit);
List<List<String>> userProps =
Env.getCurrentEnv().getAuth().getUserProperties(Auth.ROOT_USER);
- Assert.assertEquals(12, userProps.size());
+ Assert.assertEquals(13, userProps.size());
// now :
// be1 be2 be3 ==>tag1;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]