This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new af1b2c19afd branch-4.0: [improve](job) hidden secret key in streaming 
job #56742 (#56826)
af1b2c19afd is described below

commit af1b2c19afd6baaa39f98570fb8f146458f548e1
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sun Oct 12 11:33:42 2025 +0800

    branch-4.0: [improve](job) hidden secret key in streaming job #56742 
(#56826)
    
    Cherry-picked from #56742
    
    Co-authored-by: wudi <[email protected]>
---
 .../org/apache/doris/job/base/AbstractJob.java     |  1 +
 .../insert/streaming/StreamingInsertJob.java       | 51 +++++++++++++++---
 .../doris/nereids/parser/LogicalPlanBuilder.java   |  2 +-
 .../parser/LogicalPlanBuilderForEncryption.java    | 20 ++++++++
 .../trees/plans/commands/AlterJobCommand.java      |  8 ++-
 .../trees/plans/commands/CreateJobCommand.java     |  6 ++-
 .../doris/nereids/parser/EncryptSQLTest.java       | 60 ++++++++++++++++++++++
 .../streaming_job/test_streaming_insert_job.groovy |  7 ++-
 8 files changed, 143 insertions(+), 12 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index 6a113d1a042..b24c70cedbc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -97,6 +97,7 @@ public abstract class AbstractJob<T extends AbstractTask, C> 
implements Job<T, C
     @SerializedName(value = "sql")
     String executeSql;
 
+    String encryptedSql;
 
     @SerializedName(value = "stc")
     protected AtomicLong succeedTaskCount = new AtomicLong(0);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 0a9b27e7632..4f2615c4e1e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -24,6 +24,7 @@ import org.apache.doris.cloud.rpc.MetaServiceProxy;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.TimeUtils;
@@ -46,7 +47,9 @@ import org.apache.doris.load.loadv2.LoadStatistic;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
 import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.trees.plans.commands.info.BaseViewInfo;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
@@ -72,6 +75,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -166,18 +170,39 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     }
 
     private UnboundTVFRelation getCurrentTvf() {
-        if (baseCommand == null) {
-            ConnectContext ctx = 
InsertTask.makeConnectContext(getCreateUser(), getCurrentDbName());
-            StatementContext statementContext = new StatementContext();
-            ctx.setStatementContext(statementContext);
-            this.baseCommand = (InsertIntoTableCommand) new 
NereidsParser().parseSingle(getExecuteSql());
-        }
+        initLogicalPlan(false);
         List<UnboundTVFRelation> allTVFRelation = 
baseCommand.getAllTVFRelation();
         Preconditions.checkArgument(allTVFRelation.size() == 1, "Only support 
one source in insert streaming job");
         UnboundTVFRelation unboundTVFRelation = allTVFRelation.get(0);
         return unboundTVFRelation;
     }
 
+    private void makeConnectContext() {
+        ConnectContext ctx = InsertTask.makeConnectContext(getCreateUser(), 
getCurrentDbName());
+        StatementContext statementContext = new StatementContext();
+        ctx.setStatementContext(statementContext);
+    }
+
+    public void initLogicalPlan(boolean regen) {
+        if (regen || baseCommand == null) {
+            this.baseCommand = null;
+            makeConnectContext();
+            LogicalPlan logicalPlan = new 
NereidsParser().parseSingle(getExecuteSql());
+            this.baseCommand = (InsertIntoTableCommand) logicalPlan;
+        }
+    }
+
+    /**
+     * When alter updates SQL, it is necessary to
+     * update the command maintained in memory synchronously
+     * @param sql
+     */
+    public void updateExecuteSql(String sql) {
+        setExecuteSql(sql);
+        initLogicalPlan(true);
+        generateEncryptedSql();
+    }
+
     @Override
     public void updateJobStatus(JobStatus status) throws JobException {
         lock.writeLock().lock();
@@ -354,7 +379,9 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         trow.addToColumnValue(new 
TCell().setStringVal(getJobConfig().getExecuteType().name()));
         trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
         trow.addToColumnValue(new TCell().setStringVal(getJobStatus().name()));
-        trow.addToColumnValue(new TCell().setStringVal(getExecuteSql()));
+
+        trow.addToColumnValue(new 
TCell().setStringVal(StringUtils.isNotEmpty(getEncryptedSql())
+                ? getEncryptedSql() : generateEncryptedSql()));
         trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
         trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getSucceedTaskCount().get())));
         trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getFailedTaskCount().get())));
@@ -382,6 +409,16 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         return trow;
     }
 
+    private String generateEncryptedSql() {
+        makeConnectContext();
+        TreeMap<Pair<Integer, Integer>, String> indexInSqlToString = new 
TreeMap<>(new Pair.PairComparator<>());
+        new NereidsParser().parseForEncryption(getExecuteSql(), 
indexInSqlToString);
+        BaseViewInfo.rewriteSql(indexInSqlToString, getExecuteSql());
+        String encryptSql = BaseViewInfo.rewriteSql(indexInSqlToString, 
getExecuteSql());
+        setEncryptedSql(encryptSql);
+        return encryptSql;
+    }
+
     @Override
     public String formatMsgWhenExecuteQueueFull(Long taskId) {
         return commonFormatMsgWhenExecuteQueueFull(taskId, 
"streaming_task_queue_size",
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index 2cb725dc55e..a66351903c9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -1179,7 +1179,7 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
     public LogicalPlan visitAlterJob(DorisParser.AlterJobContext ctx) {
         Map<String, String> properties = ctx.propertyClause() != null
                 ? Maps.newHashMap(visitPropertyClause(ctx.propertyClause())) : 
Maps.newHashMap();
-        String executeSql = getOriginSql(ctx.supportedDmlStatement());
+        String executeSql = ctx.supportedDmlStatement() != null ? 
getOriginSql(ctx.supportedDmlStatement()) : "";
         return new AlterJobCommand(ctx.jobName.getText(), properties, 
executeSql);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilderForEncryption.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilderForEncryption.java
index e76b7456fc4..c0c7b9df027 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilderForEncryption.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilderForEncryption.java
@@ -21,6 +21,8 @@ import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.nereids.DorisParser;
+import org.apache.doris.nereids.DorisParser.InsertTableContext;
+import org.apache.doris.nereids.DorisParser.SupportedDmlStatementContext;
 import org.apache.doris.nereids.trees.plans.commands.info.SetVarOp;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 
@@ -128,6 +130,24 @@ public class LogicalPlanBuilderForEncryption extends 
LogicalPlanBuilder {
         return super.visitTableValuedFunction(ctx);
     }
 
+    // create job select tvf
+    @Override
+    public LogicalPlan 
visitCreateScheduledJob(DorisParser.CreateScheduledJobContext ctx) {
+        SupportedDmlStatementContext supportedDmlStatementContext = 
ctx.supportedDmlStatement();
+        visitInsertTable((InsertTableContext) supportedDmlStatementContext);
+        return super.visitCreateScheduledJob(ctx);
+    }
+
+    // alter job select tvf
+    @Override
+    public LogicalPlan visitAlterJob(DorisParser.AlterJobContext ctx) {
+        SupportedDmlStatementContext supportedDmlStatementContext = 
ctx.supportedDmlStatement();
+        if (ctx.supportedDmlStatement() != null) {
+            visitInsertTable((InsertTableContext) 
supportedDmlStatementContext);
+        }
+        return super.visitAlterJob(ctx);
+    }
+
     private void encryptProperty(Map<String, String> properties, int start, 
int stop) {
         if (MapUtils.isNotEmpty(properties)) {
             PrintableMap<String, String> printableMap = new 
PrintableMap<>(properties, "=",
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
index 8d5bd244df7..8d3a4cd6612 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
@@ -38,7 +38,7 @@ import java.util.Objects;
 /**
  * alter job command.
  */
-public class AlterJobCommand extends AlterCommand implements ForwardWithSync {
+public class AlterJobCommand extends AlterCommand implements ForwardWithSync, 
NeedAuditEncryption {
     // exclude job name prefix, which is used by inner job
     private static final String excludeJobNamePrefix = "inner_";
     private final String jobName;
@@ -79,7 +79,7 @@ public class AlterJobCommand extends AlterCommand implements 
ForwardWithSync {
             StreamingInsertJob updateJob = (StreamingInsertJob) job;
             // update sql
             if (StringUtils.isNotEmpty(sql)) {
-                updateJob.setExecuteSql(sql);
+                updateJob.updateExecuteSql(sql);
             }
             // update properties
             if (!properties.isEmpty()) {
@@ -133,4 +133,8 @@ public class AlterJobCommand extends AlterCommand 
implements ForwardWithSync {
         return false;
     }
 
+    @Override
+    public boolean needAuditEncryption() {
+        return true;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
index baac59fd7b9..03a423e1b0b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
@@ -47,7 +47,7 @@ import org.apache.doris.qe.StmtExecutor;
  * quantity { DAY | HOUR | MINUTE |
  * WEEK | SECOND }
  */
-public class CreateJobCommand extends Command implements ForwardWithSync {
+public class CreateJobCommand extends Command implements ForwardWithSync, 
NeedAuditEncryption {
 
     private CreateJobInfo createJobInfo;
 
@@ -82,4 +82,8 @@ public class CreateJobCommand extends Command implements 
ForwardWithSync {
         return StmtType.CREATE;
     }
 
+    @Override
+    public boolean needAuditEncryption() {
+        return true;
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/EncryptSQLTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/EncryptSQLTest.java
index 90dfecf6ab3..b4e19fa55b5 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/EncryptSQLTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/EncryptSQLTest.java
@@ -256,6 +256,66 @@ public class EncryptSQLTest extends ParserTestBase {
         sql = "SET PASSWORD FOR 'admin' = PASSWORD('123456')";
         res = "SET PASSWORD FOR 'admin' = PASSWORD('*XXX')";
         parseAndCheck(sql, res);
+
+        // create job
+        sql = "CREATE JOB my_job"
+                + " ON STREAMING"
+                + " DO"
+                + " INSERT INTO test.`student`"
+                + " SELECT * FROM S3"
+                + " ("
+                + " \"uri\" = \"s3://bucketname/demo/*.csv\","
+                + " \"format\" = \"csv\","
+                + " \"column_separator\" = \",\","
+                + " \"s3.endpoint\" = \"s3.ap-southeast-1.amazonaws.com\","
+                + " \"s3.region\" = \"ap-southeast-1\","
+                + " \"s3.access_key\" = \"ak\","
+                + " \"s3.secret_key\" = \"abcdefg\""
+                + " );";
+
+        res = "CREATE JOB my_job"
+                + " ON STREAMING"
+                + " DO"
+                + " INSERT INTO test.`student`"
+                + " SELECT * FROM S3"
+                + " ("
+                + " \"uri\" = \"s3://bucketname/demo/*.csv\","
+                + " \"format\" = \"csv\","
+                + " \"column_separator\" = \",\","
+                + " \"s3.endpoint\" = \"s3.ap-southeast-1.amazonaws.com\","
+                + " \"s3.region\" = \"ap-southeast-1\","
+                + " \"s3.access_key\" = \"ak\","
+                + " \"s3.secret_key\" = \"*XXX\""
+                + " );";
+        parseAndCheck(sql, res);
+
+        // alter job
+        sql = "ALTER JOB my_job"
+                + " INSERT INTO test.`student`"
+                + " SELECT * FROM S3"
+                + " ("
+                + " \"uri\" = \"s3://bucketname/demo/*.csv\","
+                + " \"format\" = \"csv\","
+                + " \"column_separator\" = \",\","
+                + " \"s3.endpoint\" = \"s3.ap-southeast-1.amazonaws.com\","
+                + " \"s3.region\" = \"ap-southeast-1\","
+                + " \"s3.access_key\" = \"ak\","
+                + " \"s3.secret_key\" = \"abcdefg\""
+                + " );";
+
+        res = "ALTER JOB my_job"
+                + " INSERT INTO test.`student`"
+                + " SELECT * FROM S3"
+                + " ("
+                + " \"uri\" = \"s3://bucketname/demo/*.csv\","
+                + " \"format\" = \"csv\","
+                + " \"column_separator\" = \",\","
+                + " \"s3.endpoint\" = \"s3.ap-southeast-1.amazonaws.com\","
+                + " \"s3.region\" = \"ap-southeast-1\","
+                + " \"s3.access_key\" = \"ak\","
+                + " \"s3.secret_key\" = \"*XXX\""
+                + " );";
+        parseAndCheck(sql, res);
     }
 
     private void parseAndCheck(String sql, String expected) {
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
index 5695fa31c92..66c46a5e57d 100644
--- 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
@@ -95,6 +95,12 @@ suite("test_streaming_insert_job") {
     def pauseShowTask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
     assert pauseShowTask.size() == 0
 
+    // check encrypt sk
+    def jobExecuteSQL = sql """
+        select ExecuteSql from jobs("type"="insert") where Name='${jobName}'
+    """
+    assert jobExecuteSQL.get(0).get(0).contains("${getS3AK()}")
+    assert !jobExecuteSQL.get(0).get(0).contains("${getS3SK()}")
 
     def jobInfo = sql """
         select currentOffset, endoffset, loadStatistic from 
jobs("type"="insert") where Name='${jobName}'
@@ -157,5 +163,4 @@ suite("test_streaming_insert_job") {
 
     def jobCountRsp = sql """select count(1) from jobs("type"="insert")  where 
Name ='${jobName}'"""
     assert jobCountRsp.get(0).get(0) == 0
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to