This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 8a7f5c45d5f branch-4.0: [Improve](job) add priv check for streaming 
job #56965 (#57068)
8a7f5c45d5f is described below

commit 8a7f5c45d5f2c673d3f48dfb78de9f6c59f1a71c
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Oct 18 10:35:24 2025 +0800

    branch-4.0: [Improve](job) add priv check for streaming job #56965 (#57068)
    
    Cherry-picked from #56965
    
    Co-authored-by: wudi <[email protected]>
---
 .../insert/streaming/StreamingInsertJob.java       |  53 ++++++
 .../org/apache/doris/job/manager/JobManager.java   |  10 +-
 .../trees/plans/commands/AlterJobCommand.java      |  29 ++-
 .../plans/commands/AlterJobStatusCommand.java      |  23 ++-
 .../trees/plans/commands/DropJobCommand.java       |  24 ++-
 .../trees/plans/commands/PauseJobCommand.java      |   7 -
 .../trees/plans/commands/ResumeJobCommand.java     |   7 -
 .../trees/plans/commands/info/CreateJobInfo.java   |   7 +-
 .../tablefunction/JobsTableValuedFunction.java     |   7 -
 .../doris/tablefunction/MetadataGenerator.java     |  23 +++
 .../tablefunction/TasksTableValuedFunction.java    |   7 -
 .../suites/auth_call/test_ddl_job_auth.groovy      |  22 ++-
 .../test_streaming_insert_job_crud.groovy          |  19 ++
 .../test_streaming_insert_job_priv.groovy          | 204 +++++++++++++++++++++
 14 files changed, 393 insertions(+), 49 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 010b27ff4ed..fb7568d7637 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -23,11 +23,14 @@ import org.apache.doris.cloud.proto.Cloud;
 import org.apache.doris.cloud.rpc.MetaServiceProxy;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.job.base.AbstractJob;
 import org.apache.doris.job.base.JobExecutionConfiguration;
 import org.apache.doris.job.base.TimerDefinition;
@@ -44,11 +47,13 @@ import org.apache.doris.job.offset.SourceOffsetProvider;
 import org.apache.doris.job.offset.SourceOffsetProviderFactory;
 import org.apache.doris.load.loadv2.LoadJob;
 import org.apache.doris.load.loadv2.LoadStatistic;
+import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
 import org.apache.doris.nereids.parser.NereidsParser;
 import org.apache.doris.nereids.trees.plans.commands.info.BaseViewInfo;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.commands.insert.InsertUtils;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
@@ -409,6 +414,54 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         return trow;
     }
 
+    private static boolean checkPrivilege(ConnectContext ctx, LogicalPlan 
logicalPlan) throws AnalysisException {
+        if (!(logicalPlan instanceof InsertIntoTableCommand)) {
+            throw new AnalysisException("Only support insert command");
+        }
+        LogicalPlan logicalQuery = ((InsertIntoTableCommand) 
logicalPlan).getLogicalQuery();
+        List<String> targetTable = 
InsertUtils.getTargetTableQualified(logicalQuery, ctx);
+        Preconditions.checkArgument(targetTable.size() == 3, "target table 
name is invalid");
+        if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ctx,
+                InternalCatalog.INTERNAL_CATALOG_NAME,
+                targetTable.get(1),
+                targetTable.get(2),
+                PrivPredicate.LOAD)) {
+            
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, 
"LOAD",
+                    ctx.getQualifiedUser(),
+                    ctx.getRemoteIP(),
+                    targetTable.get(1),
+                    targetTable.get(2));
+        }
+        return true;
+    }
+
+    public boolean checkPrivilege(ConnectContext ctx) throws AnalysisException 
{
+        LogicalPlan logicalPlan = new 
NereidsParser().parseSingle(getExecuteSql());
+        return checkPrivilege(ctx, logicalPlan);
+    }
+
+    public static boolean checkPrivilege(ConnectContext ctx, String sql) 
throws AnalysisException {
+        LogicalPlan logicalPlan = new NereidsParser().parseSingle(sql);
+        return checkPrivilege(ctx, logicalPlan);
+    }
+
+    public boolean hasPrivilege(UserIdentity userIdentity) {
+        ConnectContext ctx = InsertTask.makeConnectContext(userIdentity, 
getCurrentDbName());
+        try {
+            LogicalPlan logicalPlan = new 
NereidsParser().parseSingle(getExecuteSql());
+            LogicalPlan logicalQuery = ((InsertIntoTableCommand) 
logicalPlan).getLogicalQuery();
+            List<String> targetTable = 
InsertUtils.getTargetTableQualified(logicalQuery, ctx);
+            Preconditions.checkArgument(targetTable.size() == 3, "target table 
name is invalid");
+            return 
Env.getCurrentEnv().getAccessManager().checkTblPriv(userIdentity,
+                    InternalCatalog.INTERNAL_CATALOG_NAME,
+                    targetTable.get(1),
+                    targetTable.get(2),
+                    PrivPredicate.LOAD);
+        } finally {
+            ctx.cleanup();
+        }
+    }
+
     private String generateEncryptedSql() {
         makeConnectContext();
         TreeMap<Pair<Integer, Integer>, String> indexInSqlToString = new 
TreeMap<>(new Pair.PairComparator<>());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 51132ad07ba..31d42d69725 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -475,12 +475,20 @@ public class JobManager<T extends AbstractJob<?, C>, C> 
implements Writable {
     }
 
     public T getJobByName(String jobName) throws JobException {
+        T job = getJobByNameOrNull(jobName);
+        if (job == null) {
+            throw new JobException("job not exist, jobName: " + jobName);
+        }
+        return job;
+    }
+
+    public T getJobByNameOrNull(String jobName) {
         for (T a : jobMap.values()) {
             if (a.getJobName().equals(jobName)) {
                 return a;
             }
         }
-        throw new JobException("job not exist, jobName:" + jobName);
+        return null;
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
index 5ded155f861..9aa5d202399 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
@@ -20,6 +20,7 @@ package org.apache.doris.nereids.trees.plans.commands;
 import org.apache.doris.analysis.StmtType;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Pair;
 import org.apache.doris.job.base.AbstractJob;
 import org.apache.doris.job.base.JobExecuteType;
 import org.apache.doris.job.common.JobStatus;
@@ -29,6 +30,7 @@ import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
 import org.apache.doris.nereids.parser.NereidsParser;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.commands.insert.InsertUtils;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 import org.apache.doris.qe.ConnectContext;
@@ -108,6 +110,8 @@ public class AlterJobCommand extends AlterCommand 
implements ForwardWithSync, Ne
         if 
(job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING)
                 && job instanceof StreamingInsertJob) {
             StreamingInsertJob streamingJob = (StreamingInsertJob) job;
+            streamingJob.checkPrivilege(ConnectContext.get());
+
             boolean proCheck = checkProperties(streamingJob.getProperties());
             boolean sqlCheck = checkSql(streamingJob.getExecuteSql());
             if (sqlCheck) {
@@ -124,11 +128,19 @@ public class AlterJobCommand extends AlterCommand 
implements ForwardWithSync, Ne
     /**
      * Check if there are any unmodifiable properties in TVF
      */
-    private void checkUnmodifiableProperties(String originExecuteSql) {
-        UnboundTVFRelation originTvf = getTvf(originExecuteSql);
-        UnboundTVFRelation inputTvf = getTvf(sql);
+    private void checkUnmodifiableProperties(String originExecuteSql) throws 
AnalysisException {
+        Pair<List<String>, UnboundTVFRelation> origin = 
getTargetTableAndTvf(originExecuteSql);
+        Pair<List<String>, UnboundTVFRelation> input = 
getTargetTableAndTvf(sql);
+        UnboundTVFRelation originTvf = origin.second;
+        UnboundTVFRelation inputTvf = input.second;
+
+        Preconditions.checkArgument(Objects.equals(origin.first, input.first),
+                "The target table cannot be modified in ALTER JOB");
+
         
Preconditions.checkArgument(originTvf.getFunctionName().equalsIgnoreCase(inputTvf.getFunctionName()),
-                "The tvf type %s cannot be modified in ALTER JOB", inputTvf);
+                "The tvf type cannot be modified in ALTER JOB: original=%s, 
new=%s",
+                originTvf.getFunctionName(), inputTvf.getFunctionName());
+
         switch (originTvf.getFunctionName().toLowerCase()) {
             case "s3":
                 
Preconditions.checkArgument(Objects.equals(originTvf.getProperties().getMap().get("uri"),
@@ -140,13 +152,18 @@ public class AlterJobCommand extends AlterCommand 
implements ForwardWithSync, Ne
         }
     }
 
-    private UnboundTVFRelation getTvf(String sql) {
+    private Pair<List<String>, UnboundTVFRelation> getTargetTableAndTvf(String 
sql) throws AnalysisException {
         LogicalPlan logicalPlan = new NereidsParser().parseSingle(sql);
+        if (!(logicalPlan instanceof InsertIntoTableCommand)) {
+            throw new AnalysisException("Only support insert command");
+        }
+        LogicalPlan logicalQuery = ((InsertIntoTableCommand) 
logicalPlan).getLogicalQuery();
+        List<String> targetTable = 
InsertUtils.getTargetTableQualified(logicalQuery, ConnectContext.get());
         InsertIntoTableCommand baseCommand = (InsertIntoTableCommand) 
logicalPlan;
         List<UnboundTVFRelation> allTVFRelation = 
baseCommand.getAllTVFRelation();
         Preconditions.checkArgument(allTVFRelation.size() == 1, "Only support 
one source in insert streaming job");
         UnboundTVFRelation unboundTVFRelation = allTVFRelation.get(0);
-        return unboundTVFRelation;
+        return Pair.of(targetTable, unboundTVFRelation);
     }
 
     private boolean checkProperties(Map<String, String> originProps) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobStatusCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobStatusCommand.java
index 46cb24382e6..30672fef090 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobStatusCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobStatusCommand.java
@@ -22,6 +22,8 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
+import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@@ -52,7 +54,7 @@ public abstract class AlterJobStatusCommand extends Command 
implements ForwardWi
 
     @Override
     public void run(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
-        validate();
+        validate(ctx);
         doRun(ctx, executor);
     }
 
@@ -61,15 +63,26 @@ public abstract class AlterJobStatusCommand extends Command 
implements ForwardWi
         return visitor.visitAlterJobStatusCommand(this, context);
     }
 
-    private void validate() throws Exception {
-        if 
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), 
PrivPredicate.ADMIN)) {
-            
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, 
"ADMIN");
-        }
+    private void validate(ConnectContext ctx) throws Exception {
+        checkAuth(ctx);
         if (jobName.startsWith(excludeJobNamePrefix)) {
             throw new AnalysisException("Can't alter inner job status");
         }
     }
 
+    protected void checkAuth(ConnectContext ctx) throws Exception {
+        AbstractJob job = ctx.getEnv().getJobManager().getJobByName(jobName);
+        if (job instanceof StreamingInsertJob) {
+            StreamingInsertJob streamingJob = (StreamingInsertJob) job;
+            streamingJob.checkPrivilege(ConnectContext.get());
+            return;
+        }
+
+        if 
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), 
PrivPredicate.ADMIN)) {
+            
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, 
"ADMIN");
+        }
+    }
+
     public abstract void doRun(ConnectContext ctx, StmtExecutor executor) 
throws Exception;
 
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropJobCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropJobCommand.java
index bff078d6332..3506e6c2558 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropJobCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropJobCommand.java
@@ -21,6 +21,9 @@ import org.apache.doris.analysis.StmtType;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
+import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@@ -49,9 +52,28 @@ public class DropJobCommand extends AlterJobStatusCommand 
implements ForwardWith
     }
 
     public void doRun(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
+        ctx.getEnv().getJobManager().unregisterJob(super.getJobName(), 
ifExists);
+    }
+
+    @Override
+    protected void checkAuth(ConnectContext ctx) throws Exception {
+        AbstractJob job = 
ctx.getEnv().getJobManager().getJobByNameOrNull(super.getJobName());
+        if (job == null) {
+            if (!ifExists) {
+                throw new JobException("job not exist, jobName: " + 
super.getJobName());
+            }
+            return;
+        }
+
+        if (job instanceof StreamingInsertJob) {
+            StreamingInsertJob streamingJob = (StreamingInsertJob) job;
+            streamingJob.checkPrivilege(ConnectContext.get());
+            return;
+        }
+
         if 
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), 
PrivPredicate.ADMIN)) {
             
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, 
"ADMIN");
         }
-        ctx.getEnv().getJobManager().unregisterJob(super.getJobName(), 
ifExists);
     }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java
index 90de3c4564b..2b22a527293 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java
@@ -18,11 +18,7 @@
 package org.apache.doris.nereids.trees.plans.commands;
 
 import org.apache.doris.analysis.StmtType;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.common.ErrorCode;
-import org.apache.doris.common.ErrorReport;
 import org.apache.doris.job.common.JobStatus;
-import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 import org.apache.doris.qe.ConnectContext;
@@ -43,9 +39,6 @@ public class PauseJobCommand extends AlterJobStatusCommand 
implements ForwardWit
 
     @Override
     public void doRun(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
-        if 
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), 
PrivPredicate.ADMIN)) {
-            
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, 
"ADMIN");
-        }
         ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), 
JobStatus.PAUSED);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java
index 1ad21237a39..906f1c9d159 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java
@@ -18,13 +18,9 @@
 package org.apache.doris.nereids.trees.plans.commands;
 
 import org.apache.doris.analysis.StmtType;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.common.ErrorCode;
-import org.apache.doris.common.ErrorReport;
 import org.apache.doris.job.base.AbstractJob;
 import org.apache.doris.job.common.JobStatus;
 import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
-import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 import org.apache.doris.qe.ConnectContext;
@@ -45,9 +41,6 @@ public class ResumeJobCommand extends AlterJobStatusCommand 
implements ForwardWi
 
     @Override
     public void doRun(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
-        if 
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), 
PrivPredicate.ADMIN)) {
-            
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, 
"ADMIN");
-        }
         AbstractJob job = 
ctx.getEnv().getJobManager().getJobByName(super.getJobName());
         if (job instanceof StreamingInsertJob) {
             ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), 
JobStatus.PENDING);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
index fecac5e135c..361be24ee07 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
@@ -211,7 +211,12 @@ public class CreateJobInfo {
         startsTimeStampOptional.ifPresent(s -> 
timerDefinition.setStartTimeMs(stripQuotesAndParseTimestamp(s)));
     }
 
-    protected static void checkAuth() throws AnalysisException {
+    protected void checkAuth() throws AnalysisException {
+        if (streamingJob) {
+            StreamingInsertJob.checkPrivilege(ConnectContext.get(), 
executeSql);
+            return;
+        }
+
         if 
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), 
PrivPredicate.ADMIN)) {
             
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, 
"ADMIN");
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java
index d9c5ec5ed2b..ba55b25fb42 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java
@@ -18,11 +18,9 @@
 package org.apache.doris.tablefunction;
 
 import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
 import org.apache.doris.job.common.JobType;
 import org.apache.doris.job.extensions.insert.InsertJob;
 import org.apache.doris.job.extensions.mtmv.MTMVJob;
-import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.thrift.TJobsMetadataParams;
@@ -65,11 +63,6 @@ public class JobsTableValuedFunction extends 
MetadataTableValuedFunction {
             throw new AnalysisException("Invalid job metadata query");
         }
         this.jobType = jobType;
-        if (jobType != JobType.MV) {
-            if 
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), 
PrivPredicate.ADMIN)) {
-                throw new AnalysisException("only ADMIN priv can operate");
-            }
-        }
     }
 
     public static Integer getColumnIndexFromColumnName(String columnName, 
TMetadataTableRequestParams params)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index d55c41828d1..2ce92cb1b1e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -67,6 +67,7 @@ import 
org.apache.doris.datasource.iceberg.IcebergMetadataCache;
 import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
 import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.job.common.JobType;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
 import org.apache.doris.job.extensions.mtmv.MTMVJob;
 import org.apache.doris.job.task.AbstractTask;
 import org.apache.doris.mtmv.BaseTableInfo;
@@ -1169,12 +1170,23 @@ public class MetadataGenerator {
 
         List<org.apache.doris.job.base.AbstractJob> jobList = 
Env.getCurrentEnv().getJobManager().queryJobs(jobType);
 
+        boolean hasAdmin = 
Env.getCurrentEnv().getAccessManager().checkGlobalPriv(userIdentity, 
PrivPredicate.ADMIN);
         for (org.apache.doris.job.base.AbstractJob job : jobList) {
             if (job instanceof MTMVJob) {
                 MTMVJob mtmvJob = (MTMVJob) job;
                 if (!mtmvJob.hasPriv(userIdentity, PrivPredicate.SHOW)) {
                     continue;
                 }
+            } else if (job instanceof StreamingInsertJob) {
+                StreamingInsertJob streamingJob = (StreamingInsertJob) job;
+                if (!streamingJob.hasPrivilege(userIdentity)) {
+                    continue;
+                }
+            } else {
+                // common insert job
+                if (!hasAdmin) {
+                    continue;
+                }
             }
             dataBatch.add(job.getTvfInfo());
         }
@@ -1196,6 +1208,7 @@ public class MetadataGenerator {
         List<TRow> dataBatch = Lists.newArrayList();
         TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
 
+        boolean hasAdmin = 
Env.getCurrentEnv().getAccessManager().checkGlobalPriv(userIdentity, 
PrivPredicate.ADMIN);
         List<org.apache.doris.job.base.AbstractJob> jobList = 
Env.getCurrentEnv().getJobManager().queryJobs(jobType);
 
         for (org.apache.doris.job.base.AbstractJob job : jobList) {
@@ -1204,6 +1217,16 @@ public class MetadataGenerator {
                 if (!mtmvJob.hasPriv(userIdentity, PrivPredicate.SHOW)) {
                     continue;
                 }
+            } else if (job instanceof StreamingInsertJob) {
+                StreamingInsertJob streamingJob = (StreamingInsertJob) job;
+                if (!streamingJob.hasPrivilege(userIdentity)) {
+                    continue;
+                }
+            } else {
+                // common insert job
+                if (!hasAdmin) {
+                    continue;
+                }
             }
             List<AbstractTask> tasks = job.queryAllTasks();
             for (AbstractTask task : tasks) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java
index b343a7dbeed..31018320595 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java
@@ -18,11 +18,9 @@
 package org.apache.doris.tablefunction;
 
 import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
 import org.apache.doris.job.common.JobType;
 import org.apache.doris.job.extensions.insert.InsertTask;
 import org.apache.doris.job.extensions.mtmv.MTMVTask;
-import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.thrift.TMetaScanRange;
@@ -65,11 +63,6 @@ public class TasksTableValuedFunction extends 
MetadataTableValuedFunction {
             throw new AnalysisException("Invalid task metadata query");
         }
         this.jobType = jobType;
-        if (jobType != JobType.MV) {
-            if 
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), 
PrivPredicate.ADMIN)) {
-                throw new AnalysisException("only ADMIN priv can operate");
-            }
-        }
     }
 
     public static Integer getColumnIndexFromColumnName(String columnName, 
TMetadataTableRequestParams params)
diff --git a/regression-test/suites/auth_call/test_ddl_job_auth.groovy 
b/regression-test/suites/auth_call/test_ddl_job_auth.groovy
index bbc554cec30..022309be7ce 100644
--- a/regression-test/suites/auth_call/test_ddl_job_auth.groovy
+++ b/regression-test/suites/auth_call/test_ddl_job_auth.groovy
@@ -24,10 +24,12 @@ suite("test_ddl_job_auth","p0,auth_call") {
     String tableName = 'test_ddl_job_auth_tb'
     String tableNameDst = 'test_ddl_job_auth_tb_dst'
     String jobName = 'test_ddl_job_auth_job'
+    String jobNameTest = 'test_ddl_job_auth_job_test'
 
     try_sql("DROP USER ${user}")
     try_sql """drop database if exists ${dbName}"""
     try_sql("""DROP JOB where jobName='${jobName}';""")
+    try_sql("""DROP JOB where jobName='${jobNameTest}';""")
     sql """CREATE USER '${user}' IDENTIFIED BY '${pwd}'"""
     //cloud-mode
     if (isCloudMode()) {
@@ -51,6 +53,9 @@ suite("test_ddl_job_auth","p0,auth_call") {
             );"""
     sql """create table ${dbName}.${tableNameDst} like 
${dbName}.${tableName}"""
 
+    // for job pause resume drop
+    sql """CREATE JOB ${jobNameTest} ON SCHEDULE EVERY 1 MINUTE DO INSERT INTO 
${dbName}.${tableNameDst} SELECT * FROM ${dbName}.${tableName};"""
+
     // ddl create,show,drop
     connect(user, "${pwd}", context.config.jdbcUrl) {
         test {
@@ -58,24 +63,27 @@ suite("test_ddl_job_auth","p0,auth_call") {
             exception "denied"
         }
         test {
-            sql """PAUSE JOB where jobname='${jobName}';"""
+            sql """PAUSE JOB where jobname='${jobNameTest}';"""
             exception "denied"
         }
         test {
-            sql """RESUME JOB where jobName= '${jobName}';"""
+            sql """RESUME JOB where jobName= '${jobNameTest}';"""
             exception "denied"
         }
 
         test {
-            sql """DROP JOB where jobName='${jobName}';"""
+            sql """DROP JOB where jobName='${jobNameTest}';"""
             exception "denied"
         }
 
-        test {
-            sql """select * from jobs("type"="insert") where 
Name="${jobName}";"""
-            exception "ADMIN priv"
-        }
+        def res = sql """select * from jobs("type"="insert") where 
Name="${jobName}";"""
+        assertTrue(res.size() == 0)
     }
+
+    sql """DROP JOB where jobName='${jobNameTest}';"""
+    def resCnt = sql """select * from jobs("type"="insert") where 
Name="${jobNameTest}";"""
+    assertTrue(resCnt.size() == 0)
+
     sql """grant admin_priv on *.*.* to ${user}"""
     connect(user, "${pwd}", context.config.jdbcUrl) {
         sql """CREATE JOB ${jobName} ON SCHEDULE AT '2100-01-01 00:00:00' DO 
INSERT INTO ${dbName}.${tableNameDst} SELECT * FROM ${dbName}.${tableName};"""
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_crud.groovy
 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_crud.groovy
index 123c8ab2659..9a4ac438a12 100644
--- 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_crud.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_crud.groovy
@@ -432,6 +432,25 @@ suite("test_streaming_insert_job_crud") {
         """
     }, "The uri property cannot be modified in ALTER JOB")
 
+    // alter target table
+    expectExceptionLike({
+        sql """
+       ALTER JOB ${jobName}
+       INSERT INTO NoExistTable123
+       SELECT * FROM S3
+        (
+            "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+            "format" = "csv",
+            "provider" = "${getS3Provider()}",
+            "column_separator" = ",",
+            "s3.endpoint" = "${getS3Endpoint()}",
+            "s3.region" = "${getS3Region()}",
+            "s3.access_key" = "${getS3AK()}",
+            "s3.secret_key" = "${getS3SK()}"
+        );
+        """
+    }, "The target table cannot be modified in ALTER JOB")
+
     /************************ delete **********************/
     // drop pause job
     pausedJobStatus = sql """
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_priv.groovy
 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_priv.groovy
new file mode 100644
index 00000000000..efe1067a386
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_priv.groovy
@@ -0,0 +1,204 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_insert_job_priv") {
+    def tableName = "test_streaming_insert_job_priv_tbl"
+    def jobName = "test_streaming_insert_job_priv_name"
+
+    sql """drop table if exists `${tableName}` force"""
+    sql """
+        DROP JOB IF EXISTS where jobname =  '${jobName}'
+    """
+
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `c1` int NULL,
+            `c2` string NULL,
+            `c3` int  NULL,
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`c1`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    // create user
+    def user = "test_streaming_insert_job_priv_user"
+    def pwd = '123456'
+    def dbName = context.config.getDbNameByFile(context.file)
+    sql """DROP USER IF EXISTS '${user}'"""
+    sql """CREATE USER '${user}' IDENTIFIED BY '${pwd}'"""
+    def tokens = context.config.jdbcUrl.split('/')
+    def url = tokens[0] + "//" + tokens[2] + "/" + dbName + "?"
+    sql """grant select_priv on ${dbName}.* to ${user}"""
+
+    if (isCloudMode()){
+        // Cloud requires USAGE_PRIV to show clusters.
+        def clusters = sql_return_maparray """show clusters"""
+        log.info("show cluster res: " + clusters)
+        assertNotNull(clusters)
+
+        for (item in clusters) {
+            log.info("cluster item: " + item.is_current + ", " + item.cluster)
+            if (item.is_current.equalsIgnoreCase("TRUE")) {
+                sql """GRANT USAGE_PRIV ON CLUSTER `${item.cluster}` TO 
${user}""";
+                break
+            }
+        }
+    }
+
+    // create job with select priv user
+    connect(user, "${pwd}", url) {
+        expectExceptionLike({
+            sql """
+               CREATE JOB ${jobName}  
+               ON STREAMING DO INSERT INTO ${tableName} 
+               SELECT * FROM S3
+                (
+                    "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+                    "format" = "csv",
+                    "provider" = "${getS3Provider()}",
+                    "column_separator" = ",",
+                    "s3.endpoint" = "${getS3Endpoint()}",
+                    "s3.region" = "${getS3Region()}",
+                    "s3.access_key" = "${getS3AK()}",
+                    "s3.secret_key" = "${getS3SK()}"
+                );
+            """
+        }, "LOAD command denied to user")
+    }
+
+    def jobCount = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+    assert jobCount.size() == 0
+
+    // create streaming job by load_priv
+    sql """grant load_priv on ${dbName}.${tableName} to ${user}"""
+    connect(user, "${pwd}", url) {
+        sql """
+           CREATE JOB ${jobName}  
+           PROPERTIES(
+            "s3.max_batch_files" = "1"
+           )
+           ON STREAMING DO INSERT INTO ${tableName} 
+           SELECT * FROM S3
+            (
+                "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+                "format" = "csv",
+                "provider" = "${getS3Provider()}",
+                "column_separator" = ",",
+                "s3.endpoint" = "${getS3Endpoint()}",
+                "s3.region" = "${getS3Region()}",
+                "s3.access_key" = "${getS3AK()}",
+                "s3.secret_key" = "${getS3SK()}"
+            );
+        """
+    }
+
+    try {
+        Awaitility.await().atMost(300, SECONDS)
+                .pollInterval(1, SECONDS).until(
+                {
+                    def jobSuccendCount = sql """ select SucceedTaskCount from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+                    log.info("jobSuccendCount: " + jobSuccendCount)
+                    // check job status and succeed task count larger than 2
+                    jobSuccendCount.size() == 1 && '2' <= 
jobSuccendCount.get(0).get(0)
+                }
+        )
+    } catch (Exception ex){
+        def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+        def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+        log.info("show job: " + showjob)
+        log.info("show task: " + showtask)
+        throw ex;
+    }
+
+    def jobResult = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+    log.info("show success job: " + jobResult)
+    // revoke load_priv
+    sql """REVOKE load_priv on ${dbName}.${tableName} FROM ${user}"""
+
+    connect(user, "${pwd}", url) {
+        jobCount = sql """SELECT * from jobs("type"="insert") where 
Name='${jobName}'"""
+        assert jobCount.size() == 0
+
+        expectExceptionLike({
+            sql """PAUSE JOB where jobname =  '${jobName}'"""
+        }, "LOAD command denied to user")
+
+        expectExceptionLike({
+            sql """RESUME JOB where jobname =  '${jobName}'"""
+        }, "LOAD command denied to user")
+
+        expectExceptionLike({
+            sql """DROP JOB where jobname =  '${jobName}'"""
+        }, "LOAD command denied to user")
+    }
+
+    // grant
+    sql """grant load_priv on ${dbName}.${tableName} to ${user}"""
+    connect(user, "${pwd}", url) {
+        jobCount = sql """SELECT * from jobs("type"="insert") where 
Name='${jobName}'"""
+        assert jobCount.size() == 1
+
+        sql """PAUSE JOB where jobname =  '${jobName}'"""
+        def pausedJobStatus = sql """
+            select status from jobs("type"="insert") where Name='${jobName}'
+        """
+        assert pausedJobStatus.get(0).get(0) == "PAUSED"
+        sql """RESUME JOB where jobname =  '${jobName}'"""
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(1, SECONDS).until(
+                    {
+                        def jobRunning = sql """ select status from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+                        log.info("check job running: " + jobRunning)
+                        // check job status running
+                        jobRunning.size() == 1 && jobRunning.get(0).get(0) == 
'RUNNING'
+                    }
+            )
+        } catch (Exception ex){
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex;
+        }
+
+        def resumedJobStatus = sql """
+            select status from jobs("type"="insert") where Name='${jobName}'
+        """
+        assert resumedJobStatus.get(0).get(0) == "RUNNING"
+
+        sql """DROP JOB where jobname =  '${jobName}'"""
+        jobCount = sql """SELECT * from jobs("type"="insert") where 
Name='${jobName}'"""
+        assert jobCount.size() == 0
+    }
+
+    sql """DROP USER IF EXISTS '${user}'"""
+
+    sql """
+        DROP JOB IF EXISTS where jobname =  '${jobName}'
+    """
+
+    def jobCountRsp = sql """select count(1) from jobs("type"="insert")  where 
Name ='${jobName}'"""
+    assert jobCountRsp.get(0).get(0) == 0
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to