This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 8a7f5c45d5f branch-4.0: [Improve](job) add priv check for streaming
job #56965 (#57068)
8a7f5c45d5f is described below
commit 8a7f5c45d5f2c673d3f48dfb78de9f6c59f1a71c
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Oct 18 10:35:24 2025 +0800
branch-4.0: [Improve](job) add priv check for streaming job #56965 (#57068)
Cherry-picked from #56965
Co-authored-by: wudi <[email protected]>
---
.../insert/streaming/StreamingInsertJob.java | 53 ++++++
.../org/apache/doris/job/manager/JobManager.java | 10 +-
.../trees/plans/commands/AlterJobCommand.java | 29 ++-
.../plans/commands/AlterJobStatusCommand.java | 23 ++-
.../trees/plans/commands/DropJobCommand.java | 24 ++-
.../trees/plans/commands/PauseJobCommand.java | 7 -
.../trees/plans/commands/ResumeJobCommand.java | 7 -
.../trees/plans/commands/info/CreateJobInfo.java | 7 +-
.../tablefunction/JobsTableValuedFunction.java | 7 -
.../doris/tablefunction/MetadataGenerator.java | 23 +++
.../tablefunction/TasksTableValuedFunction.java | 7 -
.../suites/auth_call/test_ddl_job_auth.groovy | 22 ++-
.../test_streaming_insert_job_crud.groovy | 19 ++
.../test_streaming_insert_job_priv.groovy | 204 +++++++++++++++++++++
14 files changed, 393 insertions(+), 49 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 010b27ff4ed..fb7568d7637 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -23,11 +23,14 @@ import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecutionConfiguration;
import org.apache.doris.job.base.TimerDefinition;
@@ -44,11 +47,13 @@ import org.apache.doris.job.offset.SourceOffsetProvider;
import org.apache.doris.job.offset.SourceOffsetProviderFactory;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadStatistic;
+import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.commands.info.BaseViewInfo;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.commands.insert.InsertUtils;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
@@ -409,6 +414,54 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
return trow;
}
+ private static boolean checkPrivilege(ConnectContext ctx, LogicalPlan
logicalPlan) throws AnalysisException {
+ if (!(logicalPlan instanceof InsertIntoTableCommand)) {
+ throw new AnalysisException("Only support insert command");
+ }
+ LogicalPlan logicalQuery = ((InsertIntoTableCommand)
logicalPlan).getLogicalQuery();
+ List<String> targetTable =
InsertUtils.getTargetTableQualified(logicalQuery, ctx);
+ Preconditions.checkArgument(targetTable.size() == 3, "target table
name is invalid");
+ if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ctx,
+ InternalCatalog.INTERNAL_CATALOG_NAME,
+ targetTable.get(1),
+ targetTable.get(2),
+ PrivPredicate.LOAD)) {
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR,
"LOAD",
+ ctx.getQualifiedUser(),
+ ctx.getRemoteIP(),
+ targetTable.get(1),
+ targetTable.get(2));
+ }
+ return true;
+ }
+
+ public boolean checkPrivilege(ConnectContext ctx) throws AnalysisException
{
+ LogicalPlan logicalPlan = new
NereidsParser().parseSingle(getExecuteSql());
+ return checkPrivilege(ctx, logicalPlan);
+ }
+
+ public static boolean checkPrivilege(ConnectContext ctx, String sql)
throws AnalysisException {
+ LogicalPlan logicalPlan = new NereidsParser().parseSingle(sql);
+ return checkPrivilege(ctx, logicalPlan);
+ }
+
+ public boolean hasPrivilege(UserIdentity userIdentity) {
+ ConnectContext ctx = InsertTask.makeConnectContext(userIdentity,
getCurrentDbName());
+ try {
+ LogicalPlan logicalPlan = new
NereidsParser().parseSingle(getExecuteSql());
+ LogicalPlan logicalQuery = ((InsertIntoTableCommand)
logicalPlan).getLogicalQuery();
+ List<String> targetTable =
InsertUtils.getTargetTableQualified(logicalQuery, ctx);
+ Preconditions.checkArgument(targetTable.size() == 3, "target table
name is invalid");
+ return
Env.getCurrentEnv().getAccessManager().checkTblPriv(userIdentity,
+ InternalCatalog.INTERNAL_CATALOG_NAME,
+ targetTable.get(1),
+ targetTable.get(2),
+ PrivPredicate.LOAD);
+ } finally {
+ ctx.cleanup();
+ }
+ }
+
private String generateEncryptedSql() {
makeConnectContext();
TreeMap<Pair<Integer, Integer>, String> indexInSqlToString = new
TreeMap<>(new Pair.PairComparator<>());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 51132ad07ba..31d42d69725 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -475,12 +475,20 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
}
public T getJobByName(String jobName) throws JobException {
+ T job = getJobByNameOrNull(jobName);
+ if (job == null) {
+ throw new JobException("job not exist, jobName: " + jobName);
+ }
+ return job;
+ }
+
+ public T getJobByNameOrNull(String jobName) {
for (T a : jobMap.values()) {
if (a.getJobName().equals(jobName)) {
return a;
}
}
- throw new JobException("job not exist, jobName:" + jobName);
+ return null;
}
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
index 5ded155f861..9aa5d202399 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
@@ -20,6 +20,7 @@ package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.analysis.StmtType;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Pair;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.common.JobStatus;
@@ -29,6 +30,7 @@ import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.PlanType;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.commands.insert.InsertUtils;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
@@ -108,6 +110,8 @@ public class AlterJobCommand extends AlterCommand
implements ForwardWithSync, Ne
if
(job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING)
&& job instanceof StreamingInsertJob) {
StreamingInsertJob streamingJob = (StreamingInsertJob) job;
+ streamingJob.checkPrivilege(ConnectContext.get());
+
boolean proCheck = checkProperties(streamingJob.getProperties());
boolean sqlCheck = checkSql(streamingJob.getExecuteSql());
if (sqlCheck) {
@@ -124,11 +128,19 @@ public class AlterJobCommand extends AlterCommand
implements ForwardWithSync, Ne
/**
* Check if there are any unmodifiable properties in TVF
*/
- private void checkUnmodifiableProperties(String originExecuteSql) {
- UnboundTVFRelation originTvf = getTvf(originExecuteSql);
- UnboundTVFRelation inputTvf = getTvf(sql);
+ private void checkUnmodifiableProperties(String originExecuteSql) throws
AnalysisException {
+ Pair<List<String>, UnboundTVFRelation> origin =
getTargetTableAndTvf(originExecuteSql);
+ Pair<List<String>, UnboundTVFRelation> input =
getTargetTableAndTvf(sql);
+ UnboundTVFRelation originTvf = origin.second;
+ UnboundTVFRelation inputTvf = input.second;
+
+ Preconditions.checkArgument(Objects.equals(origin.first, input.first),
+ "The target table cannot be modified in ALTER JOB");
+
Preconditions.checkArgument(originTvf.getFunctionName().equalsIgnoreCase(inputTvf.getFunctionName()),
- "The tvf type %s cannot be modified in ALTER JOB", inputTvf);
+ "The tvf type cannot be modified in ALTER JOB: original=%s,
new=%s",
+ originTvf.getFunctionName(), inputTvf.getFunctionName());
+
switch (originTvf.getFunctionName().toLowerCase()) {
case "s3":
Preconditions.checkArgument(Objects.equals(originTvf.getProperties().getMap().get("uri"),
@@ -140,13 +152,18 @@ public class AlterJobCommand extends AlterCommand
implements ForwardWithSync, Ne
}
}
- private UnboundTVFRelation getTvf(String sql) {
+ private Pair<List<String>, UnboundTVFRelation> getTargetTableAndTvf(String
sql) throws AnalysisException {
LogicalPlan logicalPlan = new NereidsParser().parseSingle(sql);
+ if (!(logicalPlan instanceof InsertIntoTableCommand)) {
+ throw new AnalysisException("Only support insert command");
+ }
+ LogicalPlan logicalQuery = ((InsertIntoTableCommand)
logicalPlan).getLogicalQuery();
+ List<String> targetTable =
InsertUtils.getTargetTableQualified(logicalQuery, ConnectContext.get());
InsertIntoTableCommand baseCommand = (InsertIntoTableCommand)
logicalPlan;
List<UnboundTVFRelation> allTVFRelation =
baseCommand.getAllTVFRelation();
Preconditions.checkArgument(allTVFRelation.size() == 1, "Only support
one source in insert streaming job");
UnboundTVFRelation unboundTVFRelation = allTVFRelation.get(0);
- return unboundTVFRelation;
+ return Pair.of(targetTable, unboundTVFRelation);
}
private boolean checkProperties(Map<String, String> originProps) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobStatusCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobStatusCommand.java
index 46cb24382e6..30672fef090 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobStatusCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobStatusCommand.java
@@ -22,6 +22,8 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
+import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@@ -52,7 +54,7 @@ public abstract class AlterJobStatusCommand extends Command
implements ForwardWi
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws
Exception {
- validate();
+ validate(ctx);
doRun(ctx, executor);
}
@@ -61,15 +63,26 @@ public abstract class AlterJobStatusCommand extends Command
implements ForwardWi
return visitor.visitAlterJobStatusCommand(this, context);
}
- private void validate() throws Exception {
- if
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(),
PrivPredicate.ADMIN)) {
-
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
"ADMIN");
- }
+ private void validate(ConnectContext ctx) throws Exception {
+ checkAuth(ctx);
if (jobName.startsWith(excludeJobNamePrefix)) {
throw new AnalysisException("Can't alter inner job status");
}
}
+ protected void checkAuth(ConnectContext ctx) throws Exception {
+ AbstractJob job = ctx.getEnv().getJobManager().getJobByName(jobName);
+ if (job instanceof StreamingInsertJob) {
+ StreamingInsertJob streamingJob = (StreamingInsertJob) job;
+ streamingJob.checkPrivilege(ConnectContext.get());
+ return;
+ }
+
+ if
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(),
PrivPredicate.ADMIN)) {
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
"ADMIN");
+ }
+ }
+
public abstract void doRun(ConnectContext ctx, StmtExecutor executor)
throws Exception;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropJobCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropJobCommand.java
index bff078d6332..3506e6c2558 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropJobCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropJobCommand.java
@@ -21,6 +21,9 @@ import org.apache.doris.analysis.StmtType;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
+import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@@ -49,9 +52,28 @@ public class DropJobCommand extends AlterJobStatusCommand
implements ForwardWith
}
public void doRun(ConnectContext ctx, StmtExecutor executor) throws
Exception {
+ ctx.getEnv().getJobManager().unregisterJob(super.getJobName(),
ifExists);
+ }
+
+ @Override
+ protected void checkAuth(ConnectContext ctx) throws Exception {
+ AbstractJob job =
ctx.getEnv().getJobManager().getJobByNameOrNull(super.getJobName());
+ if (job == null) {
+ if (!ifExists) {
+ throw new JobException("job not exist, jobName: " +
super.getJobName());
+ }
+ return;
+ }
+
+ if (job instanceof StreamingInsertJob) {
+ StreamingInsertJob streamingJob = (StreamingInsertJob) job;
+ streamingJob.checkPrivilege(ConnectContext.get());
+ return;
+ }
+
if
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(),
PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
"ADMIN");
}
- ctx.getEnv().getJobManager().unregisterJob(super.getJobName(),
ifExists);
}
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java
index 90de3c4564b..2b22a527293 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java
@@ -18,11 +18,7 @@
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.analysis.StmtType;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.common.ErrorCode;
-import org.apache.doris.common.ErrorReport;
import org.apache.doris.job.common.JobStatus;
-import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
@@ -43,9 +39,6 @@ public class PauseJobCommand extends AlterJobStatusCommand
implements ForwardWit
@Override
public void doRun(ConnectContext ctx, StmtExecutor executor) throws
Exception {
- if
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(),
PrivPredicate.ADMIN)) {
-
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
"ADMIN");
- }
ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(),
JobStatus.PAUSED);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java
index 1ad21237a39..906f1c9d159 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java
@@ -18,13 +18,9 @@
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.analysis.StmtType;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.common.ErrorCode;
-import org.apache.doris.common.ErrorReport;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
-import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
@@ -45,9 +41,6 @@ public class ResumeJobCommand extends AlterJobStatusCommand
implements ForwardWi
@Override
public void doRun(ConnectContext ctx, StmtExecutor executor) throws
Exception {
- if
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(),
PrivPredicate.ADMIN)) {
-
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
"ADMIN");
- }
AbstractJob job =
ctx.getEnv().getJobManager().getJobByName(super.getJobName());
if (job instanceof StreamingInsertJob) {
ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(),
JobStatus.PENDING);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
index fecac5e135c..361be24ee07 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
@@ -211,7 +211,12 @@ public class CreateJobInfo {
startsTimeStampOptional.ifPresent(s ->
timerDefinition.setStartTimeMs(stripQuotesAndParseTimestamp(s)));
}
- protected static void checkAuth() throws AnalysisException {
+ protected void checkAuth() throws AnalysisException {
+ if (streamingJob) {
+ StreamingInsertJob.checkPrivilege(ConnectContext.get(),
executeSql);
+ return;
+ }
+
if
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(),
PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
"ADMIN");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java
index d9c5ec5ed2b..ba55b25fb42 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java
@@ -18,11 +18,9 @@
package org.apache.doris.tablefunction;
import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.job.extensions.mtmv.MTMVJob;
-import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TJobsMetadataParams;
@@ -65,11 +63,6 @@ public class JobsTableValuedFunction extends
MetadataTableValuedFunction {
throw new AnalysisException("Invalid job metadata query");
}
this.jobType = jobType;
- if (jobType != JobType.MV) {
- if
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(),
PrivPredicate.ADMIN)) {
- throw new AnalysisException("only ADMIN priv can operate");
- }
- }
}
public static Integer getColumnIndexFromColumnName(String columnName,
TMetadataTableRequestParams params)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index d55c41828d1..2ce92cb1b1e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -67,6 +67,7 @@ import
org.apache.doris.datasource.iceberg.IcebergMetadataCache;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.job.common.JobType;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
import org.apache.doris.job.extensions.mtmv.MTMVJob;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.mtmv.BaseTableInfo;
@@ -1169,12 +1170,23 @@ public class MetadataGenerator {
List<org.apache.doris.job.base.AbstractJob> jobList =
Env.getCurrentEnv().getJobManager().queryJobs(jobType);
+ boolean hasAdmin =
Env.getCurrentEnv().getAccessManager().checkGlobalPriv(userIdentity,
PrivPredicate.ADMIN);
for (org.apache.doris.job.base.AbstractJob job : jobList) {
if (job instanceof MTMVJob) {
MTMVJob mtmvJob = (MTMVJob) job;
if (!mtmvJob.hasPriv(userIdentity, PrivPredicate.SHOW)) {
continue;
}
+ } else if (job instanceof StreamingInsertJob) {
+ StreamingInsertJob streamingJob = (StreamingInsertJob) job;
+ if (!streamingJob.hasPrivilege(userIdentity)) {
+ continue;
+ }
+ } else {
+ // common insert job
+ if (!hasAdmin) {
+ continue;
+ }
}
dataBatch.add(job.getTvfInfo());
}
@@ -1196,6 +1208,7 @@ public class MetadataGenerator {
List<TRow> dataBatch = Lists.newArrayList();
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
+ boolean hasAdmin =
Env.getCurrentEnv().getAccessManager().checkGlobalPriv(userIdentity,
PrivPredicate.ADMIN);
List<org.apache.doris.job.base.AbstractJob> jobList =
Env.getCurrentEnv().getJobManager().queryJobs(jobType);
for (org.apache.doris.job.base.AbstractJob job : jobList) {
@@ -1204,6 +1217,16 @@ public class MetadataGenerator {
if (!mtmvJob.hasPriv(userIdentity, PrivPredicate.SHOW)) {
continue;
}
+ } else if (job instanceof StreamingInsertJob) {
+ StreamingInsertJob streamingJob = (StreamingInsertJob) job;
+ if (!streamingJob.hasPrivilege(userIdentity)) {
+ continue;
+ }
+ } else {
+ // common insert job
+ if (!hasAdmin) {
+ continue;
+ }
}
List<AbstractTask> tasks = job.queryAllTasks();
for (AbstractTask task : tasks) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java
index b343a7dbeed..31018320595 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java
@@ -18,11 +18,9 @@
package org.apache.doris.tablefunction;
import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.extensions.insert.InsertTask;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
-import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TMetaScanRange;
@@ -65,11 +63,6 @@ public class TasksTableValuedFunction extends
MetadataTableValuedFunction {
throw new AnalysisException("Invalid task metadata query");
}
this.jobType = jobType;
- if (jobType != JobType.MV) {
- if
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(),
PrivPredicate.ADMIN)) {
- throw new AnalysisException("only ADMIN priv can operate");
- }
- }
}
public static Integer getColumnIndexFromColumnName(String columnName,
TMetadataTableRequestParams params)
diff --git a/regression-test/suites/auth_call/test_ddl_job_auth.groovy
b/regression-test/suites/auth_call/test_ddl_job_auth.groovy
index bbc554cec30..022309be7ce 100644
--- a/regression-test/suites/auth_call/test_ddl_job_auth.groovy
+++ b/regression-test/suites/auth_call/test_ddl_job_auth.groovy
@@ -24,10 +24,12 @@ suite("test_ddl_job_auth","p0,auth_call") {
String tableName = 'test_ddl_job_auth_tb'
String tableNameDst = 'test_ddl_job_auth_tb_dst'
String jobName = 'test_ddl_job_auth_job'
+ String jobNameTest = 'test_ddl_job_auth_job_test'
try_sql("DROP USER ${user}")
try_sql """drop database if exists ${dbName}"""
try_sql("""DROP JOB where jobName='${jobName}';""")
+ try_sql("""DROP JOB where jobName='${jobNameTest}';""")
sql """CREATE USER '${user}' IDENTIFIED BY '${pwd}'"""
//cloud-mode
if (isCloudMode()) {
@@ -51,6 +53,9 @@ suite("test_ddl_job_auth","p0,auth_call") {
);"""
sql """create table ${dbName}.${tableNameDst} like
${dbName}.${tableName}"""
+ // for job pause resume drop
+ sql """CREATE JOB ${jobNameTest} ON SCHEDULE EVERY 1 MINUTE DO INSERT INTO
${dbName}.${tableNameDst} SELECT * FROM ${dbName}.${tableName};"""
+
// ddl create,show,drop
connect(user, "${pwd}", context.config.jdbcUrl) {
test {
@@ -58,24 +63,27 @@ suite("test_ddl_job_auth","p0,auth_call") {
exception "denied"
}
test {
- sql """PAUSE JOB where jobname='${jobName}';"""
+ sql """PAUSE JOB where jobname='${jobNameTest}';"""
exception "denied"
}
test {
- sql """RESUME JOB where jobName= '${jobName}';"""
+ sql """RESUME JOB where jobName= '${jobNameTest}';"""
exception "denied"
}
test {
- sql """DROP JOB where jobName='${jobName}';"""
+ sql """DROP JOB where jobName='${jobNameTest}';"""
exception "denied"
}
- test {
- sql """select * from jobs("type"="insert") where
Name="${jobName}";"""
- exception "ADMIN priv"
- }
+ def res = sql """select * from jobs("type"="insert") where
Name="${jobName}";"""
+ assertTrue(res.size() == 0)
}
+
+ sql """DROP JOB where jobName='${jobNameTest}';"""
+ def resCnt = sql """select * from jobs("type"="insert") where
Name="${jobNameTest}";"""
+ assertTrue(resCnt.size() == 0)
+
sql """grant admin_priv on *.*.* to ${user}"""
connect(user, "${pwd}", context.config.jdbcUrl) {
sql """CREATE JOB ${jobName} ON SCHEDULE AT '2100-01-01 00:00:00' DO
INSERT INTO ${dbName}.${tableNameDst} SELECT * FROM ${dbName}.${tableName};"""
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_crud.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_crud.groovy
index 123c8ab2659..9a4ac438a12 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_crud.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_crud.groovy
@@ -432,6 +432,25 @@ suite("test_streaming_insert_job_crud") {
"""
}, "The uri property cannot be modified in ALTER JOB")
+ // alter target table
+ expectExceptionLike({
+ sql """
+ ALTER JOB ${jobName}
+ INSERT INTO NoExistTable123
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+ }, "The target table cannot be modified in ALTER JOB")
+
/************************ delete **********************/
// drop pause job
pausedJobStatus = sql """
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_priv.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_priv.groovy
new file mode 100644
index 00000000000..efe1067a386
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_priv.groovy
@@ -0,0 +1,204 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_insert_job_priv") {
+ def tableName = "test_streaming_insert_job_priv_tbl"
+ def jobName = "test_streaming_insert_job_priv_name"
+
+ sql """drop table if exists `${tableName}` force"""
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobName}'
+ """
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `c1` int NULL,
+ `c2` string NULL,
+ `c3` int NULL,
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`c1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ // create user
+ def user = "test_streaming_insert_job_priv_user"
+ def pwd = '123456'
+ def dbName = context.config.getDbNameByFile(context.file)
+ sql """DROP USER IF EXISTS '${user}'"""
+ sql """CREATE USER '${user}' IDENTIFIED BY '${pwd}'"""
+ def tokens = context.config.jdbcUrl.split('/')
+ def url = tokens[0] + "//" + tokens[2] + "/" + dbName + "?"
+ sql """grant select_priv on ${dbName}.* to ${user}"""
+
+ if (isCloudMode()){
+ // Cloud requires USAGE_PRIV to show clusters.
+ def clusters = sql_return_maparray """show clusters"""
+ log.info("show cluster res: " + clusters)
+ assertNotNull(clusters)
+
+ for (item in clusters) {
+ log.info("cluster item: " + item.is_current + ", " + item.cluster)
+ if (item.is_current.equalsIgnoreCase("TRUE")) {
+ sql """GRANT USAGE_PRIV ON CLUSTER `${item.cluster}` TO
${user}""";
+ break
+ }
+ }
+ }
+
+ // create job with select priv user
+ connect(user, "${pwd}", url) {
+ expectExceptionLike({
+ sql """
+ CREATE JOB ${jobName}
+ ON STREAMING DO INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+ }, "LOAD command denied to user")
+ }
+
+ def jobCount = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ assert jobCount.size() == 0
+
+ // create streaming job by load_priv
+ sql """grant load_priv on ${dbName}.${tableName} to ${user}"""
+ connect(user, "${pwd}", url) {
+ sql """
+ CREATE JOB ${jobName}
+ PROPERTIES(
+ "s3.max_batch_files" = "1"
+ )
+ ON STREAMING DO INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+ }
+
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def jobSuccendCount = sql """ select SucceedTaskCount from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+ log.info("jobSuccendCount: " + jobSuccendCount)
+ // check job status and succeed task count larger than 2
+ jobSuccendCount.size() == 1 && '2' <=
jobSuccendCount.get(0).get(0)
+ }
+ )
+ } catch (Exception ex){
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex;
+ }
+
+ def jobResult = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ log.info("show success job: " + jobResult)
+ // revoke load_priv
+ sql """REVOKE load_priv on ${dbName}.${tableName} FROM ${user}"""
+
+ connect(user, "${pwd}", url) {
+ jobCount = sql """SELECT * from jobs("type"="insert") where
Name='${jobName}'"""
+ assert jobCount.size() == 0
+
+ expectExceptionLike({
+ sql """PAUSE JOB where jobname = '${jobName}'"""
+ }, "LOAD command denied to user")
+
+ expectExceptionLike({
+ sql """RESUME JOB where jobname = '${jobName}'"""
+ }, "LOAD command denied to user")
+
+ expectExceptionLike({
+ sql """DROP JOB where jobname = '${jobName}'"""
+ }, "LOAD command denied to user")
+ }
+
+ // grant
+ sql """grant load_priv on ${dbName}.${tableName} to ${user}"""
+ connect(user, "${pwd}", url) {
+ jobCount = sql """SELECT * from jobs("type"="insert") where
Name='${jobName}'"""
+ assert jobCount.size() == 1
+
+ sql """PAUSE JOB where jobname = '${jobName}'"""
+ def pausedJobStatus = sql """
+ select status from jobs("type"="insert") where Name='${jobName}'
+ """
+ assert pausedJobStatus.get(0).get(0) == "PAUSED"
+ sql """RESUME JOB where jobname = '${jobName}'"""
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def jobRunning = sql """ select status from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+ log.info("check job running: " + jobRunning)
+ // check job status running
+ jobRunning.size() == 1 && jobRunning.get(0).get(0) ==
'RUNNING'
+ }
+ )
+ } catch (Exception ex){
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex;
+ }
+
+ def resumedJobStatus = sql """
+ select status from jobs("type"="insert") where Name='${jobName}'
+ """
+ assert resumedJobStatus.get(0).get(0) == "RUNNING"
+
+ sql """DROP JOB where jobname = '${jobName}'"""
+ jobCount = sql """SELECT * from jobs("type"="insert") where
Name='${jobName}'"""
+ assert jobCount.size() == 0
+ }
+
+ sql """DROP USER IF EXISTS '${user}'"""
+
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobName}'
+ """
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert") where
Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]