This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch add-priv-streamingjob in repository https://gitbox.apache.org/repos/asf/doris.git
commit 516226d83ab23bc85b998e88caa1e89322b1ba19 Author: wudi <[email protected]> AuthorDate: Tue Oct 14 19:02:36 2025 +0800 add priv check for streaming job --- .../insert/streaming/StreamingInsertJob.java | 49 ++++++ .../org/apache/doris/job/manager/JobManager.java | 10 +- .../trees/plans/commands/AlterJobCommand.java | 28 ++- .../plans/commands/AlterJobStatusCommand.java | 23 ++- .../trees/plans/commands/DropJobCommand.java | 24 ++- .../trees/plans/commands/PauseJobCommand.java | 7 - .../trees/plans/commands/ResumeJobCommand.java | 7 - .../trees/plans/commands/info/CreateJobInfo.java | 7 +- .../tablefunction/JobsTableValuedFunction.java | 7 - .../doris/tablefunction/MetadataGenerator.java | 23 +++ .../tablefunction/TasksTableValuedFunction.java | 7 - .../org/apache/doris/regression/suite/Suite.groovy | 1 + .../test_streaming_insert_job_crud.groovy | 19 +++ .../test_streaming_insert_job_priv.groovy | 189 +++++++++++++++++++++ 14 files changed, 359 insertions(+), 42 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index 010b27ff4ed..a46fe9035b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -23,11 +23,14 @@ import org.apache.doris.cloud.proto.Cloud; import org.apache.doris.cloud.rpc.MetaServiceProxy; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.job.base.AbstractJob; import org.apache.doris.job.base.JobExecutionConfiguration; import org.apache.doris.job.base.TimerDefinition; @@ -44,11 +47,13 @@ import org.apache.doris.job.offset.SourceOffsetProvider; import org.apache.doris.job.offset.SourceOffsetProviderFactory; import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.load.loadv2.LoadStatistic; +import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.analyzer.UnboundTVFRelation; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.trees.plans.commands.info.BaseViewInfo; import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertUtils; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; @@ -409,6 +414,50 @@ public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, M return trow; } + private static boolean checkPrivilege(ConnectContext ctx, LogicalPlan logicalPlan) throws AnalysisException { + if (!(logicalPlan instanceof InsertIntoTableCommand)) { + throw new AnalysisException("Only support insert command"); + } + LogicalPlan logicalQuery = ((InsertIntoTableCommand) logicalPlan).getLogicalQuery(); + List<String> targetTable = InsertUtils.getTargetTableQualified(logicalQuery, ctx); + Preconditions.checkArgument(targetTable.size() == 3, "target table name is invalid"); + if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ctx, + InternalCatalog.INTERNAL_CATALOG_NAME, + targetTable.get(1), + targetTable.get(2), + PrivPredicate.LOAD)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), + targetTable.get(1), + targetTable.get(2)); + } + return true; + } + + public boolean checkPrivilege(ConnectContext ctx) throws AnalysisException { + LogicalPlan logicalPlan = new NereidsParser().parseSingle(getExecuteSql()); + return checkPrivilege(ctx, logicalPlan); + } + + public static boolean checkPrivilege(ConnectContext ctx, String sql) throws AnalysisException { + LogicalPlan logicalPlan = new NereidsParser().parseSingle(sql); + return checkPrivilege(ctx, logicalPlan); + } + + public boolean hasPrivilege(UserIdentity userIdentity) { + ConnectContext ctx = InsertTask.makeConnectContext(userIdentity, getCurrentDbName()); + LogicalPlan logicalPlan = new NereidsParser().parseSingle(getExecuteSql()); + LogicalPlan logicalQuery = ((InsertIntoTableCommand) logicalPlan).getLogicalQuery(); + List<String> targetTable = InsertUtils.getTargetTableQualified(logicalQuery, ctx); + Preconditions.checkArgument(targetTable.size() == 3, "target table name is invalid"); + return Env.getCurrentEnv().getAccessManager().checkTblPriv(userIdentity, + InternalCatalog.INTERNAL_CATALOG_NAME, + targetTable.get(1), + targetTable.get(2), + PrivPredicate.LOAD); + } + private String generateEncryptedSql() { makeConnectContext(); TreeMap<Pair<Integer, Integer>, String> indexInSqlToString = new TreeMap<>(new Pair.PairComparator<>()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java index 51132ad07ba..31d42d69725 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java @@ -475,12 +475,20 @@ public class JobManager<T extends AbstractJob<?, C>, C> implements Writable { } public T getJobByName(String jobName) throws JobException { + T job = getJobByNameOrNull(jobName); + if (job == null) { + throw new JobException("job not exist, jobName: " + jobName); + } + return job; + } + + public T getJobByNameOrNull(String jobName) { for (T a : jobMap.values()) { if (a.getJobName().equals(jobName)) { return a; } } - throw new JobException("job not exist, jobName:" + jobName); + return null; } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java index 5ded155f861..96617c4eb75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java @@ -20,6 +20,7 @@ package org.apache.doris.nereids.trees.plans.commands; import org.apache.doris.analysis.StmtType; import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Pair; import org.apache.doris.job.base.AbstractJob; import org.apache.doris.job.base.JobExecuteType; import org.apache.doris.job.common.JobStatus; @@ -29,6 +30,7 @@ import org.apache.doris.nereids.analyzer.UnboundTVFRelation; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertUtils; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.qe.ConnectContext; @@ -108,6 +110,8 @@ public class AlterJobCommand extends AlterCommand implements ForwardWithSync, Ne if (job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING) && job instanceof StreamingInsertJob) { StreamingInsertJob streamingJob = (StreamingInsertJob) job; + streamingJob.checkPrivilege(ConnectContext.get()); + boolean proCheck = checkProperties(streamingJob.getProperties()); boolean sqlCheck = checkSql(streamingJob.getExecuteSql()); if (sqlCheck) { @@ -124,11 +128,18 @@ public class AlterJobCommand extends AlterCommand implements ForwardWithSync, Ne /** * Check if there are any unmodifiable properties in TVF */ - private void checkUnmodifiableProperties(String originExecuteSql) { - UnboundTVFRelation originTvf = getTvf(originExecuteSql); - UnboundTVFRelation inputTvf = getTvf(sql); + private void checkUnmodifiableProperties(String originExecuteSql) throws AnalysisException { + Pair<List<String>, UnboundTVFRelation> origin = getTargetTableAndTvf(originExecuteSql); + Pair<List<String>, UnboundTVFRelation> input = getTargetTableAndTvf(sql); + UnboundTVFRelation originTvf = origin.second; + UnboundTVFRelation inputTvf = input.second; + + Preconditions.checkArgument(Objects.equals(origin.first, input.first), + "The target table cannot be modified in ALTER JOB"); + Preconditions.checkArgument(originTvf.getFunctionName().equalsIgnoreCase(inputTvf.getFunctionName()), - "The tvf type %s cannot be modified in ALTER JOB", inputTvf); + "The tvf type %s cannot be modified in ALTER JOB", inputTvf.getFunctionName()); + switch (originTvf.getFunctionName().toLowerCase()) { case "s3": Preconditions.checkArgument(Objects.equals(originTvf.getProperties().getMap().get("uri"), @@ -140,13 +151,18 @@ public class AlterJobCommand extends AlterCommand implements ForwardWithSync, Ne } } - private UnboundTVFRelation getTvf(String sql) { + private Pair<List<String>, UnboundTVFRelation> getTargetTableAndTvf(String sql) throws AnalysisException { LogicalPlan logicalPlan = new NereidsParser().parseSingle(sql); + if (!(logicalPlan instanceof InsertIntoTableCommand)) { + throw new AnalysisException("Only support insert command"); + } + LogicalPlan logicalQuery = ((InsertIntoTableCommand) logicalPlan).getLogicalQuery(); + List<String> targetTable = InsertUtils.getTargetTableQualified(logicalQuery, ConnectContext.get()); InsertIntoTableCommand baseCommand = (InsertIntoTableCommand) logicalPlan; List<UnboundTVFRelation> allTVFRelation = baseCommand.getAllTVFRelation(); Preconditions.checkArgument(allTVFRelation.size() == 1, "Only support one source in insert streaming job"); UnboundTVFRelation unboundTVFRelation = allTVFRelation.get(0); - return unboundTVFRelation; + return Pair.of(targetTable, unboundTVFRelation); } private boolean checkProperties(Map<String, String> originProps) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobStatusCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobStatusCommand.java index 46cb24382e6..30672fef090 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobStatusCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobStatusCommand.java @@ -22,6 +22,8 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; +import org.apache.doris.job.base.AbstractJob; +import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; @@ -52,7 +54,7 @@ public abstract class AlterJobStatusCommand extends Command implements ForwardWi @Override public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { - validate(); + validate(ctx); doRun(ctx, executor); } @@ -61,15 +63,26 @@ public abstract class AlterJobStatusCommand extends Command implements ForwardWi return visitor.visitAlterJobStatusCommand(this, context); } - private void validate() throws Exception { - if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); - } + private void validate(ConnectContext ctx) throws Exception { + checkAuth(ctx); if (jobName.startsWith(excludeJobNamePrefix)) { throw new AnalysisException("Can't alter inner job status"); } } + protected void checkAuth(ConnectContext ctx) throws Exception { + AbstractJob job = ctx.getEnv().getJobManager().getJobByName(jobName); + if (job instanceof StreamingInsertJob) { + StreamingInsertJob streamingJob = (StreamingInsertJob) job; + streamingJob.checkPrivilege(ConnectContext.get()); + return; + } + + if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); + } + } + public abstract void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropJobCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropJobCommand.java index bff078d6332..3506e6c2558 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropJobCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropJobCommand.java @@ -21,6 +21,9 @@ import org.apache.doris.analysis.StmtType; import org.apache.doris.catalog.Env; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; +import org.apache.doris.job.base.AbstractJob; +import org.apache.doris.job.exception.JobException; +import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; @@ -49,9 +52,28 @@ public class DropJobCommand extends AlterJobStatusCommand implements ForwardWith } public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception { + ctx.getEnv().getJobManager().unregisterJob(super.getJobName(), ifExists); + } + + @Override + protected void checkAuth(ConnectContext ctx) throws Exception { + AbstractJob job = ctx.getEnv().getJobManager().getJobByNameOrNull(super.getJobName()); + if (job == null) { + if (!ifExists) { + throw new JobException("job not exist, jobName: " + super.getJobName()); + } + return; + } + + if (job instanceof StreamingInsertJob) { + StreamingInsertJob streamingJob = (StreamingInsertJob) job; + streamingJob.checkPrivilege(ConnectContext.get()); + return; + } + if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); } - ctx.getEnv().getJobManager().unregisterJob(super.getJobName(), ifExists); } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java index 90de3c4564b..2b22a527293 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java @@ -18,11 +18,7 @@ package org.apache.doris.nereids.trees.plans.commands; import org.apache.doris.analysis.StmtType; -import org.apache.doris.catalog.Env; -import org.apache.doris.common.ErrorCode; -import org.apache.doris.common.ErrorReport; import org.apache.doris.job.common.JobStatus; -import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.qe.ConnectContext; @@ -43,9 +39,6 @@ public class PauseJobCommand extends AlterJobStatusCommand implements ForwardWit @Override public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception { - if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); - } ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), JobStatus.PAUSED); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java index 1ad21237a39..906f1c9d159 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java @@ -18,13 +18,9 @@ package org.apache.doris.nereids.trees.plans.commands; import org.apache.doris.analysis.StmtType; -import org.apache.doris.catalog.Env; -import org.apache.doris.common.ErrorCode; -import org.apache.doris.common.ErrorReport; import org.apache.doris.job.base.AbstractJob; import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob; -import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.qe.ConnectContext; @@ -45,9 +41,6 @@ public class ResumeJobCommand extends AlterJobStatusCommand implements ForwardWi @Override public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception { - if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); - } AbstractJob job = ctx.getEnv().getJobManager().getJobByName(super.getJobName()); if (job instanceof StreamingInsertJob) { ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), JobStatus.PENDING); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java index 9493cf5d6d7..c6ebba5d461 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java @@ -214,7 +214,12 @@ public class CreateJobInfo { startsTimeStampOptional.ifPresent(s -> timerDefinition.setStartTimeMs(stripQuotesAndParseTimestamp(s))); } - protected static void checkAuth() throws AnalysisException { + protected void checkAuth() throws AnalysisException { + if (streamingJob) { + StreamingInsertJob.checkPrivilege(ConnectContext.get(), executeSql); + return; + } + if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java index d9c5ec5ed2b..ba55b25fb42 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java @@ -18,11 +18,9 @@ package org.apache.doris.tablefunction; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Env; import org.apache.doris.job.common.JobType; import org.apache.doris.job.extensions.insert.InsertJob; import org.apache.doris.job.extensions.mtmv.MTMVJob; -import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TJobsMetadataParams; @@ -65,11 +63,6 @@ public class JobsTableValuedFunction extends MetadataTableValuedFunction { throw new AnalysisException("Invalid job metadata query"); } this.jobType = jobType; - if (jobType != JobType.MV) { - if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { - throw new AnalysisException("only ADMIN priv can operate"); - } - } } public static Integer getColumnIndexFromColumnName(String columnName, TMetadataTableRequestParams params) diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index d55c41828d1..2ce92cb1b1e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -67,6 +67,7 @@ import org.apache.doris.datasource.iceberg.IcebergMetadataCache; import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog; import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.job.common.JobType; +import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob; import org.apache.doris.job.extensions.mtmv.MTMVJob; import org.apache.doris.job.task.AbstractTask; import org.apache.doris.mtmv.BaseTableInfo; @@ -1169,12 +1170,23 @@ public class MetadataGenerator { List<org.apache.doris.job.base.AbstractJob> jobList = Env.getCurrentEnv().getJobManager().queryJobs(jobType); + boolean hasAdmin = Env.getCurrentEnv().getAccessManager().checkGlobalPriv(userIdentity, PrivPredicate.ADMIN); for (org.apache.doris.job.base.AbstractJob job : jobList) { if (job instanceof MTMVJob) { MTMVJob mtmvJob = (MTMVJob) job; if (!mtmvJob.hasPriv(userIdentity, PrivPredicate.SHOW)) { continue; } + } else if (job instanceof StreamingInsertJob) { + StreamingInsertJob streamingJob = (StreamingInsertJob) job; + if (!streamingJob.hasPrivilege(userIdentity)) { + continue; + } + } else { + // common insert job + if (!hasAdmin) { + continue; + } } dataBatch.add(job.getTvfInfo()); } @@ -1196,6 +1208,7 @@ public class MetadataGenerator { List<TRow> dataBatch = Lists.newArrayList(); TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); + boolean hasAdmin = Env.getCurrentEnv().getAccessManager().checkGlobalPriv(userIdentity, PrivPredicate.ADMIN); List<org.apache.doris.job.base.AbstractJob> jobList = Env.getCurrentEnv().getJobManager().queryJobs(jobType); for (org.apache.doris.job.base.AbstractJob job : jobList) { @@ -1204,6 +1217,16 @@ public class MetadataGenerator { if (!mtmvJob.hasPriv(userIdentity, PrivPredicate.SHOW)) { continue; } + } else if (job instanceof StreamingInsertJob) { + StreamingInsertJob streamingJob = (StreamingInsertJob) job; + if (!streamingJob.hasPrivilege(userIdentity)) { + continue; + } + } else { + // common insert job + if (!hasAdmin) { + continue; + } } List<AbstractTask> tasks = job.queryAllTasks(); for (AbstractTask task : tasks) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java index b343a7dbeed..31018320595 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java @@ -18,11 +18,9 @@ package org.apache.doris.tablefunction; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Env; import org.apache.doris.job.common.JobType; import org.apache.doris.job.extensions.insert.InsertTask; import org.apache.doris.job.extensions.mtmv.MTMVTask; -import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TMetaScanRange; @@ -65,11 +63,6 @@ public class TasksTableValuedFunction extends MetadataTableValuedFunction { throw new AnalysisException("Invalid task metadata query"); } this.jobType = jobType; - if (jobType != JobType.MV) { - if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { - throw new AnalysisException("only ADMIN priv can operate"); - } - } } public static Integer getColumnIndexFromColumnName(String columnName, TMetadataTableRequestParams params) diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index d94e25e5caf..47d7cda3423 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -945,6 +945,7 @@ class Suite implements GroovyInterceptable { void expectExceptionLike(Closure userFunction, String errMsg = null, String errMsg2 = null) { try { userFunction() + throw new RuntimeException("no exception thrown") } catch (Exception e) { if (!Strings.isNullOrEmpty(errMsg) && !e.getMessage().contains(errMsg)) { throw e diff --git a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_crud.groovy b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_crud.groovy index 123c8ab2659..9a4ac438a12 100644 --- a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_crud.groovy +++ b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_crud.groovy @@ -432,6 +432,25 @@ suite("test_streaming_insert_job_crud") { """ }, "The uri property cannot be modified in ALTER JOB") + // alter target table + expectExceptionLike({ + sql """ + ALTER JOB ${jobName} + INSERT INTO NoExistTable123 + SELECT * FROM S3 + ( + "uri" = "s3://${s3BucketName}/regression/load/data/example_[0-1].csv", + "format" = "csv", + "provider" = "${getS3Provider()}", + "column_separator" = ",", + "s3.endpoint" = "${getS3Endpoint()}", + "s3.region" = "${getS3Region()}", + "s3.access_key" = "${getS3AK()}", + "s3.secret_key" = "${getS3SK()}" + ); + """ + }, "The target table cannot be modified in ALTER JOB") + /************************ delete **********************/ // drop pause job pausedJobStatus = sql """ diff --git a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_priv.groovy b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_priv.groovy new file mode 100644 index 00000000000..f4d0ff001e7 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_priv.groovy @@ -0,0 +1,189 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +suite("test_streaming_insert_job_priv") { + def tableName = "test_streaming_insert_job_priv_tbl" + def jobName = "test_streaming_insert_job_priv_name" + + sql """drop table if exists `${tableName}` force""" + sql """ + DROP JOB IF EXISTS where jobname = '${jobName}' + """ + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `c1` int NULL, + `c2` string NULL, + `c3` int NULL, + ) ENGINE=OLAP + DUPLICATE KEY(`c1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`c1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + // create user + def user = "test_streaming_insert_job_priv_user" + def pwd = '123456' + def dbName = context.config.getDbNameByFile(context.file) + sql """DROP USER IF EXISTS '${user}'""" + sql """CREATE USER '${user}' IDENTIFIED BY '${pwd}'""" + def tokens = context.config.jdbcUrl.split('/') + def url = tokens[0] + "//" + tokens[2] + "/" + dbName + "?" + sql """grant select_priv on ${dbName}.* to ${user}""" + + // create job with select priv user + connect(user, "${pwd}", url) { + expectExceptionLike({ + sql """ + CREATE JOB ${jobName} + ON STREAMING DO INSERT INTO ${tableName} + SELECT * FROM S3 + ( + "uri" = "s3://${s3BucketName}/regression/load/data/example_[0-1].csv", + "format" = "csv", + "provider" = "${getS3Provider()}", + "column_separator" = ",", + "s3.endpoint" = "${getS3Endpoint()}", + "s3.region" = "${getS3Region()}", + "s3.access_key" = "${getS3AK()}", + "s3.secret_key" = "${getS3SK()}" + ); + """ + }, "LOAD command denied to user") + } + + def jobCount = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + assert jobCount.size() == 0 + + // create streaming job by load_priv + sql """grant load_priv on ${dbName}.${tableName} to ${user}""" + connect(user, "${pwd}", url) { + sql """ + CREATE JOB ${jobName} + PROPERTIES( + "s3.max_batch_files" = "1" + ) + ON STREAMING DO INSERT INTO ${tableName} + SELECT * FROM S3 + ( + "uri" = "s3://${s3BucketName}/regression/load/data/example_[0-1].csv", + "format" = "csv", + "provider" = "${getS3Provider()}", + "column_separator" = ",", + "s3.endpoint" = "${getS3Endpoint()}", + "s3.region" = "${getS3Region()}", + "s3.access_key" = "${getS3AK()}", + "s3.secret_key" = "${getS3SK()}" + ); + """ + } + + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(1, SECONDS).until( + { + def jobSuccendCount = sql """ select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ + log.info("jobSuccendCount: " + jobSuccendCount) + // check job status and succeed task count larger than 2 + jobSuccendCount.size() == 1 && '2' <= jobSuccendCount.get(0).get(0) + } + ) + } catch (Exception ex){ + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex; + } + + def jobResult = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + log.info("show success job: " + jobResult) + // revoke load_priv + sql """REVOKE load_priv on ${dbName}.${tableName} FROM ${user}""" + + connect(user, "${pwd}", url) { + jobCount = sql """SELECT * from jobs("type"="insert") where Name='${jobName}'""" + assert jobCount.size() == 0 + + expectExceptionLike({ + sql """PAUSE JOB where jobname = '${jobName}'""" + }, "LOAD command denied to user") + + expectExceptionLike({ + sql """RESUME JOB where jobname = '${jobName}'""" + }, "LOAD command denied to user") + + expectExceptionLike({ + sql """DROP JOB where jobname = '${jobName}'""" + }, "LOAD command denied to user") + } + + // grant + sql """grant load_priv on ${dbName}.${tableName} to ${user}""" + connect(user, "${pwd}", url) { + jobCount = sql """SELECT * from jobs("type"="insert") where Name='${jobName}'""" + assert jobCount.size() == 1 + + sql """PAUSE JOB where jobname = '${jobName}'""" + def pausedJobStatus = sql """ + select status from jobs("type"="insert") where Name='${jobName}' + """ + assert pausedJobStatus.get(0).get(0) == "PAUSED" + sql """RESUME JOB where jobname = '${jobName}'""" + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(1, SECONDS).until( + { + def jobRunning = sql """ select status from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ + log.info("check job running: " + jobRunning) + // check job status running + jobRunning.size() == 1 && jobRunning.get(0).get(0) == 'RUNNING' + } + ) + } catch (Exception ex){ + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex; + } + + def resumedJobStatus = sql """ + select status from jobs("type"="insert") where Name='${jobName}' + """ + assert resumedJobStatus.get(0).get(0) == "RUNNING" + + sql """DROP JOB where jobname = '${jobName}'""" + jobCount = sql """SELECT * from jobs("type"="insert") where Name='${jobName}'""" + assert jobCount.size() == 0 + } + + sql """DROP USER IF EXISTS '${user}'""" + + sql """ + DROP JOB IF EXISTS where jobname = '${jobName}' + """ + + def jobCountRsp = sql """select count(1) from jobs("type"="insert") where Name ='${jobName}'""" + assert jobCountRsp.get(0).get(0) == 0 +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
