This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9111b8fd4f2 [Opt](load) Support using `AES_ENCRYPT` with internal
custom ENCRYPTKEY in stream load and routine load (#60503)
9111b8fd4f2 is described below
commit 9111b8fd4f2b5fb957a7a944bd6607368eb9c4cc
Author: bobhan1 <[email protected]>
AuthorDate: Wed Feb 25 20:21:02 2026 +0800
[Opt](load) Support using `AES_ENCRYPT` with internal custom ENCRYPTKEY in
stream load and routine load (#60503)
### What problem does this PR solve?
## Summary
Support using `CREATE ENCRYPTKEY` defined custom keys (via `KEY
db.keyname` syntax) in the `columns` parameter of Stream Load and
Routine Load.
**Problem:**
Previously, `EncryptKeyRef` was only folded to the actual key string
during the `FoldConstantRule` phase, which was skipped in load planning
(`setDebugSkipFoldConstant(true)`), causing a `Could not find function
encryptkeyref` error on BE.
**Solution:**
- Add `RewriteEncryptKeyRef` rule in `NereidsLoadUtils` to fold
`EncryptKeyRef` before `ExpressionNormalization`, bypassing the
skip-fold-constant flag
- Fix Routine Load privilege check failure by setting
`CurrentUserIdentity` in non-cloud mode during `RoutineLoadJob.plan()`
## Changes
### FE Core
- **NereidsLoadUtils.java**: Add `RewriteEncryptKeyRef` inner class that
directly invokes `FoldConstantRuleOnFE.VISITOR_INSTANCE` to fold
`EncryptKeyRef` to `StringLiteral`, placed before
`ExpressionNormalization` in the analyzer pipeline
- **RoutineLoadJob.java**: Set `ConnectContext.setCurrentUserIdentity()`
in non-cloud mode path of `plan()` method, so that privilege checks
during expression rewrite have proper user identity
### Regression Tests
- **test_stream_load_with_aes_encrypt.groovy**: Tests `AES_ENCRYPT` in
Stream Load columns with both direct key string and `KEY db.keyname`
syntax
- **test_routine_load_with_aes_encrypt.groovy**: Tests `AES_ENCRYPT` in
Routine Load columns with both direct key string and `KEY db.keyname`
syntax
## Example Usage
```sql
-- Create custom encrypt key
CREATE ENCRYPTKEY my_key AS "ABCD123456789";
-- Stream Load with ENCRYPTKEY in columns parameter
curl --location-trusted -u root: \
-H "columns: id, name, tmp_data,
encrypted_data=TO_BASE64(AES_ENCRYPT(tmp_data, KEY db.my_key))" \
-T data.csv \
http://host:port/api/db/table/_stream_load
-- Routine Load with ENCRYPTKEY in columns parameter
CREATE ROUTINE LOAD job ON table
COLUMNS(id, name, tmp_data,
encrypted_data=TO_BASE64(AES_ENCRYPT(tmp_data, KEY db.my_key)))
FROM KAFKA (...);
```
Test Plan
- Run test_stream_load_with_aes_encrypt to verify Stream Load with
AES_ENCRYPT using both direct key and ENCRYPTKEY
- Run test_routine_load_with_aes_encrypt to verify Routine Load with
AES_ENCRYPT using both direct key and ENCRYPTKEY
- Verify decrypted data matches original plaintext
- Verify existing Stream Load and Routine Load tests are not affected
---
.../doris/load/routineload/RoutineLoadJob.java | 2 +
.../doris/nereids/load/NereidsLoadUtils.java | 32 ++++
.../test_routine_load_with_aes_encrypt.out | 22 +++
.../stream_load/test_stream_load_aes_encrypt.csv | 3 +
.../test_stream_load_with_aes_encrypt.out | 17 ++
.../test_routine_load_with_aes_encrypt.groovy | 175 +++++++++++++++++++++
.../test_stream_load_with_aes_encrypt.groovy | 116 ++++++++++++++
7 files changed, 367 insertions(+)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index f036c3ba192..1c0a652dc56 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1056,6 +1056,8 @@ public abstract class RoutineLoadJob
ConnectContext.get().setCurrentUserIdentity(this.getUserIdentity());
} else {
setComputeGroup();
+ // Set user identity for privilege check in expression rewrite
(e.g., EncryptKeyRef)
+
ConnectContext.get().setCurrentUserIdentity(this.getUserIdentity());
}
if (ConnectContext.get().getEnv() == null) {
ConnectContext.get().setEnv(Env.getCurrentEnv());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadUtils.java
index 5cdf6ab074b..72ad3642d35 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadUtils.java
@@ -37,6 +37,8 @@ import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.analysis.BindExpression;
import org.apache.doris.nereids.rules.analysis.BindSink;
import org.apache.doris.nereids.rules.expression.ExpressionNormalization;
+import org.apache.doris.nereids.rules.expression.ExpressionRewrite;
+import org.apache.doris.nereids.rules.expression.rules.FoldConstantRuleOnFE;
import org.apache.doris.nereids.rules.rewrite.MergeProjects;
import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory;
import org.apache.doris.nereids.trees.expressions.Alias;
@@ -45,6 +47,7 @@ import
org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import
org.apache.doris.nereids.trees.expressions.functions.scalar.EncryptKeyRef;
import
org.apache.doris.nereids.trees.expressions.functions.scalar.JsonbParseErrorToNull;
import
org.apache.doris.nereids.trees.expressions.functions.scalar.JsonbParseErrorToValue;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
@@ -236,6 +239,11 @@ public class NereidsLoadUtils {
// the NereidsLoadPlanInfoCollector will not generate
slot by id#0,
// so we must use MergeProjects here
new MergeProjects(),
+ // RewriteEncryptKeyRef must be placed before
ExpressionNormalization,
+ // because setDebugSkipFoldConstant(true) will skip
FoldConstantRule which
+ // is responsible for folding EncryptKeyRef to
StringLiteral.
+ // We need to handle EncryptKeyRef separately to support
KEY syntax in stream load.
+ new RewriteEncryptKeyRef(),
new ExpressionNormalization())
)).execute();
Rewriter.getWholeTreeRewriterWithCustomJobs(cascadesContext,
ImmutableList.of()).execute();
@@ -371,4 +379,28 @@ public class NereidsLoadUtils {
}).toRule(RuleType.ADD_POST_PROJECT_FOR_LOAD);
}
}
+
+ /**
+ * RewriteEncryptKeyRef
+ * This rule rewrites EncryptKeyRef to StringLiteral in stream load.
+ * Since setDebugSkipFoldConstant(true) is set during stream load planning,
+ * FoldConstantRule will be skipped and EncryptKeyRef won't be folded.
+ * This rule handles EncryptKeyRef separately to support KEY syntax in
stream load columns parameter.
+ */
+ private static class RewriteEncryptKeyRef extends ExpressionRewrite {
+ private static final FoldConstantRuleOnFE FOLD_ENCRYPT_KEY_REF =
FoldConstantRuleOnFE.VISITOR_INSTANCE;
+
+ public RewriteEncryptKeyRef() {
+ super(((expression, context) -> {
+ // Use rewriteUp to traverse the expression tree bottom-up and
only fold
+ // EncryptKeyRef nodes to StringLiteral, leaving all other
expressions unchanged.
+ return expression.rewriteUp(e -> {
+ if (e instanceof EncryptKeyRef) {
+ return e.accept(FOLD_ENCRYPT_KEY_REF, context);
+ }
+ return e;
+ });
+ }));
+ }
+ }
}
diff --git
a/regression-test/data/load_p0/routine_load/test_routine_load_with_aes_encrypt.out
b/regression-test/data/load_p0/routine_load/test_routine_load_with_aes_encrypt.out
new file mode 100644
index 00000000000..88041b9e162
--- /dev/null
+++
b/regression-test/data/load_p0/routine_load/test_routine_load_with_aes_encrypt.out
@@ -0,0 +1,22 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql_count_direct --
+3
+
+-- !sql_decrypt_direct --
+1 Alice hello world
+2 Bob doris is great
+3 Charlie test encryption
+
+-- !sql_count_encryptkey --
+3
+
+-- !sql_decrypt_with_encryptkey --
+1 Alice hello world
+2 Bob doris is great
+3 Charlie test encryption
+
+-- !sql_decrypt_with_direct_key --
+1 Alice hello world
+2 Bob doris is great
+3 Charlie test encryption
+
diff --git
a/regression-test/data/load_p0/stream_load/test_stream_load_aes_encrypt.csv
b/regression-test/data/load_p0/stream_load/test_stream_load_aes_encrypt.csv
new file mode 100644
index 00000000000..75288201e1a
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_stream_load_aes_encrypt.csv
@@ -0,0 +1,3 @@
+1,Alice,hello world
+2,Bob,doris is great
+3,Charlie,test encryption
diff --git
a/regression-test/data/load_p0/stream_load/test_stream_load_with_aes_encrypt.out
b/regression-test/data/load_p0/stream_load/test_stream_load_with_aes_encrypt.out
new file mode 100644
index 00000000000..3be01d99d26
--- /dev/null
+++
b/regression-test/data/load_p0/stream_load/test_stream_load_with_aes_encrypt.out
@@ -0,0 +1,17 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql_count --
+3
+
+-- !sql_decrypt_direct --
+1 Alice hello world
+2 Bob doris is great
+3 Charlie test encryption
+
+-- !sql_count_key --
+3
+
+-- !sql_decrypt_with_key --
+1 Alice hello world
+2 Bob doris is great
+3 Charlie test encryption
+
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_with_aes_encrypt.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_with_aes_encrypt.groovy
new file mode 100644
index 00000000000..2d87a9b3626
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_with_aes_encrypt.groovy
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.RoutineLoadTestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+suite("test_routine_load_with_aes_encrypt", "p0") {
+ def tableName1 = "test_routine_load_aes_encrypt_direct_key"
+ def tableName2 = "test_routine_load_aes_encrypt_with_encryptkey"
+ def kafkaTopic1 = "test_routine_load_aes_encrypt_direct"
+ def kafkaTopic2 = "test_routine_load_aes_encrypt_key"
+ def jobName1 = "routine_load_aes_direct_key_job"
+ def jobName2 = "routine_load_aes_encryptkey_job"
+ def dbName = context.dbName
+ def encryptKeyName = "routine_load_aes_key"
+ def aesKey = "F3229A0B371ED2D9441B830D21A390C3"
+
+ if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
+ def runSql = { String q -> sql q }
+ def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
+ def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+
+ // Test data
+ def testData = [
+ "1,Alice,hello world",
+ "2,Bob,doris is great",
+ "3,Charlie,test encryption"
+ ]
+
+ // Send test data to both Kafka topics
+ [kafkaTopic1, kafkaTopic2].each { topic ->
+ testData.each { line ->
+ logger.info("Sending to Kafka topic ${topic}: ${line}")
+ def record = new ProducerRecord<>(topic, null, line)
+ producer.send(record).get()
+ }
+ }
+ producer.flush()
+
+ // ============================================================
+ // Test 1: AES_ENCRYPT with direct key string
+ // ============================================================
+ log.info("Test 1: Routine load with AES_ENCRYPT using direct key
string")
+
+ sql """ DROP TABLE IF EXISTS ${tableName1} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName1} (
+ `id` int(11) NULL,
+ `name` string NULL,
+ `encrypted_data` string NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName1} ON ${tableName1}
+ COLUMNS TERMINATED BY ",",
+ COLUMNS(id, name, tmp_data,
encrypted_data=TO_BASE64(AES_ENCRYPT(tmp_data, '${aesKey}')))
+ PROPERTIES
+ (
+ "max_batch_interval" = "5",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaTopic1}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ RoutineLoadTestUtils.waitForTaskFinish(runSql, jobName1,
tableName1, 2)
+
+ // Verify data count
+ qt_sql_count_direct """ SELECT count(*) FROM ${tableName1} """
+
+ // Verify encrypted data can be decrypted
+ qt_sql_decrypt_direct """
+ SELECT id, name, AES_DECRYPT(FROM_BASE64(encrypted_data),
'${aesKey}') as decrypted_data
+ FROM ${tableName1}
+ ORDER BY id
+ """
+ } finally {
+ sql "STOP ROUTINE LOAD FOR ${jobName1}"
+ sql """ DROP TABLE IF EXISTS ${tableName1} """
+ }
+
+ // ============================================================
+ // Test 2: AES_ENCRYPT with ENCRYPTKEY (KEY syntax)
+ // ============================================================
+ log.info("Test 2: Routine load with AES_ENCRYPT using ENCRYPTKEY (KEY
syntax)")
+
+ sql """ DROP TABLE IF EXISTS ${tableName2} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName2} (
+ `id` int(11) NULL,
+ `name` string NULL,
+ `encrypted_data` string NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ // Create encryptkey
+ try_sql """ DROP ENCRYPTKEY IF EXISTS ${encryptKeyName} """
+ sql """ CREATE ENCRYPTKEY ${encryptKeyName} AS "${aesKey}" """
+
+ // Verify encryptkey was created
+ def keyRes = sql """ SHOW ENCRYPTKEYS FROM ${dbName} """
+ log.info("Encryptkeys: ${keyRes}")
+ assertTrue(keyRes.size() >= 1, "Encryptkey should be created")
+
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName2} ON ${tableName2}
+ COLUMNS TERMINATED BY ",",
+ COLUMNS(id, name, tmp_data,
encrypted_data=TO_BASE64(AES_ENCRYPT(tmp_data, KEY
${dbName}.${encryptKeyName})))
+ PROPERTIES
+ (
+ "max_batch_interval" = "5",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaTopic2}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ RoutineLoadTestUtils.waitForTaskFinish(runSql, jobName2,
tableName2, 2)
+
+ // Verify data count
+ qt_sql_count_encryptkey """ SELECT count(*) FROM ${tableName2} """
+
+ // Verify encrypted data can be decrypted using ENCRYPTKEY
+ qt_sql_decrypt_with_encryptkey """
+ SELECT id, name, AES_DECRYPT(FROM_BASE64(encrypted_data), KEY
${dbName}.${encryptKeyName}) as decrypted_data
+ FROM ${tableName2}
+ ORDER BY id
+ """
+
+ // Verify encrypted data can also be decrypted using direct key
string (same key value)
+ qt_sql_decrypt_with_direct_key """
+ SELECT id, name, AES_DECRYPT(FROM_BASE64(encrypted_data),
'${aesKey}') as decrypted_data
+ FROM ${tableName2}
+ ORDER BY id
+ """
+ } finally {
+ sql "STOP ROUTINE LOAD FOR ${jobName2}"
+ sql """ DROP TABLE IF EXISTS ${tableName2} """
+ try_sql """ DROP ENCRYPTKEY IF EXISTS ${encryptKeyName} """
+ }
+ }
+}
diff --git
a/regression-test/suites/load_p0/stream_load/test_stream_load_with_aes_encrypt.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load_with_aes_encrypt.groovy
new file mode 100644
index 00000000000..1ef961529a0
--- /dev/null
+++
b/regression-test/suites/load_p0/stream_load/test_stream_load_with_aes_encrypt.groovy
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_with_aes_encrypt", "p0") {
+ def tableName = "test_stream_load_with_aes_encrypt"
+ def dbName = context.dbName
+ def encryptKeyName = "my_stream_load_key"
+ def aesKey = "F3229A0B371ED2D9441B830D21A390C3"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """ CREATE TABLE ${tableName} (
+ id int,
+ name string,
+ encrypted_data string
+ ) ENGINE=OLAP
+ DUPLICATE KEY (`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ // Test 1: Stream load with AES_ENCRYPT using direct key string
+ log.info("Test 1: Stream load with AES_ENCRYPT using direct key string")
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'columns', """ id, name, tmp_data,
encrypted_data=TO_BASE64(AES_ENCRYPT(tmp_data, '${aesKey}')) """
+ file 'test_stream_load_aes_encrypt.csv'
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(3, json.NumberLoadedRows)
+ }
+ }
+
+ sql """sync"""
+
+ // Verify data was loaded
+ qt_sql_count """ SELECT count(*) FROM ${tableName} """
+
+ // Verify encrypted data can be decrypted back to original using direct key
+ qt_sql_decrypt_direct """
+ SELECT id, name, AES_DECRYPT(FROM_BASE64(encrypted_data), '${aesKey}')
as decrypted_data
+ FROM ${tableName}
+ ORDER BY id
+ """
+
+ // Clean table for next test
+ sql """ TRUNCATE TABLE ${tableName} """
+
+ // Test 2: Stream load with AES_ENCRYPT using ENCRYPTKEY (KEY syntax)
+ log.info("Test 2: Stream load with AES_ENCRYPT using ENCRYPTKEY (KEY
syntax)")
+
+ // Create encryptkey with the same key value
+ try_sql """ DROP ENCRYPTKEY IF EXISTS ${encryptKeyName} """
+ sql """ CREATE ENCRYPTKEY ${encryptKeyName} AS "${aesKey}" """
+
+ // Verify encryptkey was created
+ def keyRes = sql """ SHOW ENCRYPTKEYS FROM ${dbName} """
+ log.info("Encryptkeys: ${keyRes}")
+ assertTrue(keyRes.size() >= 1, "Encryptkey should be created")
+
+ // Stream load using KEY syntax to reference encryptkey
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'columns', """ id, name, tmp_data,
encrypted_data=TO_BASE64(AES_ENCRYPT(tmp_data, KEY
${dbName}.${encryptKeyName})) """
+ file 'test_stream_load_aes_encrypt.csv'
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result with ENCRYPTKEY:
${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(3, json.NumberLoadedRows)
+ }
+ }
+
+ sql """sync"""
+
+ // Verify data count
+ qt_sql_count_key """ SELECT count(*) FROM ${tableName} """
+
+ // Verify encrypted data can be decrypted using ENCRYPTKEY
+ qt_sql_decrypt_with_key """
+ SELECT id, name, AES_DECRYPT(FROM_BASE64(encrypted_data), KEY
${dbName}.${encryptKeyName}) as decrypted_data
+ FROM ${tableName}
+ ORDER BY id
+ """
+
+ // Cleanup
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ try_sql """ DROP ENCRYPTKEY IF EXISTS ${encryptKeyName} """
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]