This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new af1b2c19afd branch-4.0: [improve](job) hidden secret key in streaming
job #56742 (#56826)
af1b2c19afd is described below
commit af1b2c19afd6baaa39f98570fb8f146458f548e1
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sun Oct 12 11:33:42 2025 +0800
branch-4.0: [improve](job) hidden secret key in streaming job #56742
(#56826)
Cherry-picked from #56742
Co-authored-by: wudi <[email protected]>
---
.../org/apache/doris/job/base/AbstractJob.java | 1 +
.../insert/streaming/StreamingInsertJob.java | 51 +++++++++++++++---
.../doris/nereids/parser/LogicalPlanBuilder.java | 2 +-
.../parser/LogicalPlanBuilderForEncryption.java | 20 ++++++++
.../trees/plans/commands/AlterJobCommand.java | 8 ++-
.../trees/plans/commands/CreateJobCommand.java | 6 ++-
.../doris/nereids/parser/EncryptSQLTest.java | 60 ++++++++++++++++++++++
.../streaming_job/test_streaming_insert_job.groovy | 7 ++-
8 files changed, 143 insertions(+), 12 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index 6a113d1a042..b24c70cedbc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -97,6 +97,7 @@ public abstract class AbstractJob<T extends AbstractTask, C>
implements Job<T, C
@SerializedName(value = "sql")
String executeSql;
+ String encryptedSql;
@SerializedName(value = "stc")
protected AtomicLong succeedTaskCount = new AtomicLong(0);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 0a9b27e7632..4f2615c4e1e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -24,6 +24,7 @@ import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.TimeUtils;
@@ -46,7 +47,9 @@ import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.trees.plans.commands.info.BaseViewInfo;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
@@ -72,6 +75,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -166,18 +170,39 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
private UnboundTVFRelation getCurrentTvf() {
- if (baseCommand == null) {
- ConnectContext ctx =
InsertTask.makeConnectContext(getCreateUser(), getCurrentDbName());
- StatementContext statementContext = new StatementContext();
- ctx.setStatementContext(statementContext);
- this.baseCommand = (InsertIntoTableCommand) new
NereidsParser().parseSingle(getExecuteSql());
- }
+ initLogicalPlan(false);
List<UnboundTVFRelation> allTVFRelation =
baseCommand.getAllTVFRelation();
Preconditions.checkArgument(allTVFRelation.size() == 1, "Only support
one source in insert streaming job");
UnboundTVFRelation unboundTVFRelation = allTVFRelation.get(0);
return unboundTVFRelation;
}
+ private void makeConnectContext() {
+ ConnectContext ctx = InsertTask.makeConnectContext(getCreateUser(),
getCurrentDbName());
+ StatementContext statementContext = new StatementContext();
+ ctx.setStatementContext(statementContext);
+ }
+
+ public void initLogicalPlan(boolean regen) {
+ if (regen || baseCommand == null) {
+ this.baseCommand = null;
+ makeConnectContext();
+ LogicalPlan logicalPlan = new
NereidsParser().parseSingle(getExecuteSql());
+ this.baseCommand = (InsertIntoTableCommand) logicalPlan;
+ }
+ }
+
+ /**
+ * When alter updates SQL, it is necessary to
+ * update the command maintained in memory synchronously
+ * @param sql
+ */
+ public void updateExecuteSql(String sql) {
+ setExecuteSql(sql);
+ initLogicalPlan(true);
+ generateEncryptedSql();
+ }
+
@Override
public void updateJobStatus(JobStatus status) throws JobException {
lock.writeLock().lock();
@@ -354,7 +379,9 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
trow.addToColumnValue(new
TCell().setStringVal(getJobConfig().getExecuteType().name()));
trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(getJobStatus().name()));
- trow.addToColumnValue(new TCell().setStringVal(getExecuteSql()));
+
+ trow.addToColumnValue(new
TCell().setStringVal(StringUtils.isNotEmpty(getEncryptedSql())
+ ? getEncryptedSql() : generateEncryptedSql()));
trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(getSucceedTaskCount().get())));
trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(getFailedTaskCount().get())));
@@ -382,6 +409,16 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
return trow;
}
+ private String generateEncryptedSql() {
+ makeConnectContext();
+ TreeMap<Pair<Integer, Integer>, String> indexInSqlToString = new
TreeMap<>(new Pair.PairComparator<>());
+ new NereidsParser().parseForEncryption(getExecuteSql(),
indexInSqlToString);
+ BaseViewInfo.rewriteSql(indexInSqlToString, getExecuteSql());
+ String encryptSql = BaseViewInfo.rewriteSql(indexInSqlToString,
getExecuteSql());
+ setEncryptedSql(encryptSql);
+ return encryptSql;
+ }
+
@Override
public String formatMsgWhenExecuteQueueFull(Long taskId) {
return commonFormatMsgWhenExecuteQueueFull(taskId,
"streaming_task_queue_size",
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index 2cb725dc55e..a66351903c9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -1179,7 +1179,7 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
public LogicalPlan visitAlterJob(DorisParser.AlterJobContext ctx) {
Map<String, String> properties = ctx.propertyClause() != null
? Maps.newHashMap(visitPropertyClause(ctx.propertyClause())) :
Maps.newHashMap();
- String executeSql = getOriginSql(ctx.supportedDmlStatement());
+ String executeSql = ctx.supportedDmlStatement() != null ?
getOriginSql(ctx.supportedDmlStatement()) : "";
return new AlterJobCommand(ctx.jobName.getText(), properties,
executeSql);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilderForEncryption.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilderForEncryption.java
index e76b7456fc4..c0c7b9df027 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilderForEncryption.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilderForEncryption.java
@@ -21,6 +21,8 @@ import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.nereids.DorisParser;
+import org.apache.doris.nereids.DorisParser.InsertTableContext;
+import org.apache.doris.nereids.DorisParser.SupportedDmlStatementContext;
import org.apache.doris.nereids.trees.plans.commands.info.SetVarOp;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
@@ -128,6 +130,24 @@ public class LogicalPlanBuilderForEncryption extends
LogicalPlanBuilder {
return super.visitTableValuedFunction(ctx);
}
+ // create job select tvf
+ @Override
+ public LogicalPlan
visitCreateScheduledJob(DorisParser.CreateScheduledJobContext ctx) {
+ SupportedDmlStatementContext supportedDmlStatementContext =
ctx.supportedDmlStatement();
+ visitInsertTable((InsertTableContext) supportedDmlStatementContext);
+ return super.visitCreateScheduledJob(ctx);
+ }
+
+ // alter job select tvf
+ @Override
+ public LogicalPlan visitAlterJob(DorisParser.AlterJobContext ctx) {
+ SupportedDmlStatementContext supportedDmlStatementContext =
ctx.supportedDmlStatement();
+ if (ctx.supportedDmlStatement() != null) {
+ visitInsertTable((InsertTableContext)
supportedDmlStatementContext);
+ }
+ return super.visitAlterJob(ctx);
+ }
+
private void encryptProperty(Map<String, String> properties, int start,
int stop) {
if (MapUtils.isNotEmpty(properties)) {
PrintableMap<String, String> printableMap = new
PrintableMap<>(properties, "=",
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
index 8d5bd244df7..8d3a4cd6612 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
@@ -38,7 +38,7 @@ import java.util.Objects;
/**
* alter job command.
*/
-public class AlterJobCommand extends AlterCommand implements ForwardWithSync {
+public class AlterJobCommand extends AlterCommand implements ForwardWithSync,
NeedAuditEncryption {
// exclude job name prefix, which is used by inner job
private static final String excludeJobNamePrefix = "inner_";
private final String jobName;
@@ -79,7 +79,7 @@ public class AlterJobCommand extends AlterCommand implements
ForwardWithSync {
StreamingInsertJob updateJob = (StreamingInsertJob) job;
// update sql
if (StringUtils.isNotEmpty(sql)) {
- updateJob.setExecuteSql(sql);
+ updateJob.updateExecuteSql(sql);
}
// update properties
if (!properties.isEmpty()) {
@@ -133,4 +133,8 @@ public class AlterJobCommand extends AlterCommand
implements ForwardWithSync {
return false;
}
+ @Override
+ public boolean needAuditEncryption() {
+ return true;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
index baac59fd7b9..03a423e1b0b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
@@ -47,7 +47,7 @@ import org.apache.doris.qe.StmtExecutor;
* quantity { DAY | HOUR | MINUTE |
* WEEK | SECOND }
*/
-public class CreateJobCommand extends Command implements ForwardWithSync {
+public class CreateJobCommand extends Command implements ForwardWithSync,
NeedAuditEncryption {
private CreateJobInfo createJobInfo;
@@ -82,4 +82,8 @@ public class CreateJobCommand extends Command implements
ForwardWithSync {
return StmtType.CREATE;
}
+ @Override
+ public boolean needAuditEncryption() {
+ return true;
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/EncryptSQLTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/EncryptSQLTest.java
index 90dfecf6ab3..b4e19fa55b5 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/EncryptSQLTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/EncryptSQLTest.java
@@ -256,6 +256,66 @@ public class EncryptSQLTest extends ParserTestBase {
sql = "SET PASSWORD FOR 'admin' = PASSWORD('123456')";
res = "SET PASSWORD FOR 'admin' = PASSWORD('*XXX')";
parseAndCheck(sql, res);
+
+ // create job
+ sql = "CREATE JOB my_job"
+ + " ON STREAMING"
+ + " DO"
+ + " INSERT INTO test.`student`"
+ + " SELECT * FROM S3"
+ + " ("
+ + " \"uri\" = \"s3://bucketname/demo/*.csv\","
+ + " \"format\" = \"csv\","
+ + " \"column_separator\" = \",\","
+ + " \"s3.endpoint\" = \"s3.ap-southeast-1.amazonaws.com\","
+ + " \"s3.region\" = \"ap-southeast-1\","
+ + " \"s3.access_key\" = \"ak\","
+ + " \"s3.secret_key\" = \"abcdefg\""
+ + " );";
+
+ res = "CREATE JOB my_job"
+ + " ON STREAMING"
+ + " DO"
+ + " INSERT INTO test.`student`"
+ + " SELECT * FROM S3"
+ + " ("
+ + " \"uri\" = \"s3://bucketname/demo/*.csv\","
+ + " \"format\" = \"csv\","
+ + " \"column_separator\" = \",\","
+ + " \"s3.endpoint\" = \"s3.ap-southeast-1.amazonaws.com\","
+ + " \"s3.region\" = \"ap-southeast-1\","
+ + " \"s3.access_key\" = \"ak\","
+ + " \"s3.secret_key\" = \"*XXX\""
+ + " );";
+ parseAndCheck(sql, res);
+
+ // alter job
+ sql = "ALTER JOB my_job"
+ + " INSERT INTO test.`student`"
+ + " SELECT * FROM S3"
+ + " ("
+ + " \"uri\" = \"s3://bucketname/demo/*.csv\","
+ + " \"format\" = \"csv\","
+ + " \"column_separator\" = \",\","
+ + " \"s3.endpoint\" = \"s3.ap-southeast-1.amazonaws.com\","
+ + " \"s3.region\" = \"ap-southeast-1\","
+ + " \"s3.access_key\" = \"ak\","
+ + " \"s3.secret_key\" = \"abcdefg\""
+ + " );";
+
+ res = "ALTER JOB my_job"
+ + " INSERT INTO test.`student`"
+ + " SELECT * FROM S3"
+ + " ("
+ + " \"uri\" = \"s3://bucketname/demo/*.csv\","
+ + " \"format\" = \"csv\","
+ + " \"column_separator\" = \",\","
+ + " \"s3.endpoint\" = \"s3.ap-southeast-1.amazonaws.com\","
+ + " \"s3.region\" = \"ap-southeast-1\","
+ + " \"s3.access_key\" = \"ak\","
+ + " \"s3.secret_key\" = \"*XXX\""
+ + " );";
+ parseAndCheck(sql, res);
}
private void parseAndCheck(String sql, String expected) {
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
index 5695fa31c92..66c46a5e57d 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
@@ -95,6 +95,12 @@ suite("test_streaming_insert_job") {
def pauseShowTask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
assert pauseShowTask.size() == 0
+ // check encrypt sk
+ def jobExecuteSQL = sql """
+ select ExecuteSql from jobs("type"="insert") where Name='${jobName}'
+ """
+ assert jobExecuteSQL.get(0).get(0).contains("${getS3AK()}")
+ assert !jobExecuteSQL.get(0).get(0).contains("${getS3SK()}")
def jobInfo = sql """
select currentOffset, endoffset, loadStatistic from
jobs("type"="insert") where Name='${jobName}'
@@ -157,5 +163,4 @@ suite("test_streaming_insert_job") {
def jobCountRsp = sql """select count(1) from jobs("type"="insert") where
Name ='${jobName}'"""
assert jobCountRsp.get(0).get(0) == 0
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]