This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 3806e408523 branch-4.1: [feature](cache) support file cache admission
control #59065 (#61547)
3806e408523 is described below
commit 3806e4085233fdbc8e4530f090669c07355a4df1
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Mar 21 10:56:26 2026 +0800
branch-4.1: [feature](cache) support file cache admission control #59065
(#61547)
Cherry-picked from #59065
Co-authored-by: Wen Zhenghu <[email protected]>
Co-authored-by: xuchenhao <[email protected]>
Co-authored-by: xuchenhao <[email protected]>
Co-authored-by: morningman <[email protected]>
---
be/src/io/file_factory.cpp | 2 +-
be/src/io/file_factory.h | 1 +
be/src/vec/exec/format/csv/csv_reader.cpp | 5 +-
be/src/vec/exec/format/json/new_json_reader.cpp | 3 +
be/src/vec/exec/format/orc/vorc_reader.cpp | 3 +
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 3 +
.../main/java/org/apache/doris/common/Config.java | 10 +
.../src/main/java/org/apache/doris/DorisFE.java | 5 +
.../org/apache/doris/common/ConfigWatcher.java | 53 +-
.../datasource/FileCacheAdmissionManager.java | 721 +++++++++++++++++++++
.../apache/doris/datasource/FileQueryScanNode.java | 80 ++-
.../org/apache/doris/datasource/FileScanNode.java | 16 +
.../apache/doris/datasource/SplitAssignment.java | 8 +-
.../apache/doris/datasource/SplitToScanRange.java | 3 +-
.../doris/nereids/parser/LogicalPlanBuilder.java | 51 ++
.../java/org/apache/doris/planner/ScanNode.java | 4 +
.../datasource/FileCacheAdmissionManagerTest.java | 437 +++++++++++++
.../FileCacheAdmissionRuleRefresherTest.java | 247 +++++++
.../doris/datasource/SplitAssignmentTest.java | 27 +-
gensrc/thrift/PlanNodes.thrift | 1 +
tools/export_mysql_rule_to_json.sh | 83 +++
21 files changed, 1742 insertions(+), 21 deletions(-)
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 7b1821c0fa8..d42d1f40969 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -67,7 +67,7 @@ io::FileReaderOptions
FileFactory::get_reader_options(RuntimeState* state,
};
if (config::enable_file_cache && state != nullptr &&
state->query_options().__isset.enable_file_cache &&
- state->query_options().enable_file_cache) {
+ state->query_options().enable_file_cache && fd.file_cache_admission) {
opts.cache_type = io::FileCachePolicy::FILE_BLOCK_CACHE;
}
if (state != nullptr &&
state->query_options().__isset.file_cache_base_path &&
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index 61e322ca0af..4966137508d 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -71,6 +71,7 @@ struct FileDescription {
// because for a hive table, differenet partitions may have different
// locations(or fs), so different files may have different fs.
std::string fs_name;
+ bool file_cache_admission = true;
};
} // namespace io
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp
b/be/src/vec/exec/format/csv/csv_reader.cpp
index e27f100a912..e6821d48296 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -1,4 +1,4 @@
-// Licensed to the Apache Software Foundation (ASF) under one
+// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
@@ -223,6 +223,9 @@ void CsvReader::_init_file_description() {
if (_range.__isset.fs_name) {
_file_description.fs_name = _range.fs_name;
}
+ if (_range.__isset.file_cache_admission) {
+ _file_description.file_cache_admission = _range.file_cache_admission;
+ }
}
Status CsvReader::init_reader(bool is_load) {
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp
b/be/src/vec/exec/format/json/new_json_reader.cpp
index 3fd33053811..0e6d3c840e6 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -164,6 +164,9 @@ void NewJsonReader::_init_file_description() {
if (_range.__isset.fs_name) {
_file_description.fs_name = _range.fs_name;
}
+ if (_range.__isset.file_cache_admission) {
+ _file_description.file_cache_admission = _range.file_cache_admission;
+ }
}
Status NewJsonReader::init_reader(
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 4169aacc4ba..8dc250dabe3 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -1431,6 +1431,9 @@ void OrcReader::_init_file_description() {
if (_scan_range.__isset.fs_name) {
_file_description.fs_name = _scan_range.fs_name;
}
+ if (_scan_range.__isset.file_cache_admission) {
+ _file_description.file_cache_admission =
_scan_range.file_cache_admission;
+ }
}
DataTypePtr OrcReader::convert_to_doris_type(const orc::Type* orc_type) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index de99ad2170c..1cc129d6a7e 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -342,6 +342,9 @@ void ParquetReader::_init_file_description() {
if (_scan_range.__isset.fs_name) {
_file_description.fs_name = _scan_range.fs_name;
}
+ if (_scan_range.__isset.file_cache_admission) {
+ _file_description.file_cache_admission =
_scan_range.file_cache_admission;
+ }
}
Status ParquetReader::init_reader(
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 0c1aa6d882e..124b87c506d 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3752,6 +3752,16 @@ public class Config extends ConfigBase {
})
public static int first_error_msg_max_length = 256;
+ @ConfField(mutable = false, description = {
+ "Whether to enable file cache admission control(Blocklist and
Allowlist)"
+ })
+ public static boolean enable_file_cache_admission_control = false;
+
+ @ConfField(mutable = false, description = {
+ "Directory path for storing admission rules JSON files"
+ })
+ public static String file_cache_admission_control_json_dir = "";
+
@ConfField
public static String cloud_snapshot_handler_class =
"org.apache.doris.cloud.snapshot.CloudSnapshotHandler";
@ConfField
diff --git a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
index 73742497226..a52e554bdf7 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
@@ -31,6 +31,7 @@ import org.apache.doris.common.lock.DeadlockMonitor;
import org.apache.doris.common.util.JdkUtils;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.FileCacheAdmissionManager;
import org.apache.doris.httpv2.HttpServer;
import org.apache.doris.journal.bdbje.BDBDebugger;
import org.apache.doris.journal.bdbje.BDBTool;
@@ -219,6 +220,10 @@ public class DorisFE {
Env.getCurrentEnv().initialize(args);
Env.getCurrentEnv().waitForReady();
+ if (Config.enable_file_cache_admission_control) {
+ FileCacheAdmissionManager.getInstance().loadOnStartup();
+ }
+
// init and start:
// 1. HttpServer for HTTP Server
// 2. FeServer for Thrift Server
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/ConfigWatcher.java
b/fe/fe-core/src/main/java/org/apache/doris/common/ConfigWatcher.java
index 65530b91f7d..27fb11bfe88 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ConfigWatcher.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ConfigWatcher.java
@@ -32,6 +32,7 @@ import java.nio.file.WatchEvent;
import java.nio.file.WatchEvent.Kind;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
+import java.util.function.Consumer;
/*
* used for watch config changed
@@ -41,6 +42,10 @@ public class ConfigWatcher extends Daemon {
public final Path configPath;
+ private Consumer<Path> onCreateConsumer = null;
+ private Consumer<Path> onModifyConsumer = null;
+ private Consumer<Path> onDeleteConsumer = null;
+
public ConfigWatcher(String configPathStr) {
super("config watcher");
Preconditions.checkState(!Strings.isNullOrEmpty(configPathStr));
@@ -95,16 +100,58 @@ public class ConfigWatcher extends Daemon {
}
}
+ public void setOnCreateConsumer(Consumer<Path> consumer) {
+ this.onCreateConsumer = consumer;
+ }
+
+ public void setOnModifyConsumer(Consumer<Path> consumer) {
+ this.onModifyConsumer = consumer;
+ }
+
+ public void setOnDeleteConsumer(Consumer<Path> consumer) {
+ this.onDeleteConsumer = consumer;
+ }
+
private void handleCreate(Path filePath) {
- // TODO(cmy): implement if needed
+ if (onCreateConsumer != null) {
+ try {
+ onCreateConsumer.accept(filePath);
+ } catch (Exception e) {
+ LOG.error("Error in onCreateConsumer for file created in
directory: " + filePath, e);
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("File created in directory but no onCreateConsumer
set: " + filePath);
+ }
+ }
}
private void handleDelete(Path filePath) {
- // TODO(cmy): implement if needed
+ if (onDeleteConsumer != null) {
+ try {
+ onDeleteConsumer.accept(filePath);
+ } catch (Exception e) {
+ LOG.error("Error in onDeleteConsumer for file deleted from
directory: " + filePath, e);
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("File deleted from directory but no onDeleteConsumer
set: " + filePath);
+ }
+ }
}
private void handleModify(Path filePath) {
- // TODO(cmy): implement if needed
+ if (onModifyConsumer != null) {
+ try {
+ onModifyConsumer.accept(filePath);
+ } catch (Exception e) {
+ LOG.error("Error in onModifyConsumer for file modified in
directory: " + filePath, e);
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("File modified in directory but no onModifyConsumer
set: " + filePath);
+ }
+ }
}
// for test
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileCacheAdmissionManager.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileCacheAdmissionManager.java
new file mode 100644
index 00000000000..11cd15e0d30
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileCacheAdmissionManager.java
@@ -0,0 +1,721 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.Config;
+import org.apache.doris.common.ConfigWatcher;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class FileCacheAdmissionManager {
+ private static final Logger LOG =
LogManager.getLogger(FileCacheAdmissionManager.class);
+
+ public enum RuleType {
+ EXCLUDE(0),
+ INCLUDE(1);
+
+ private final int value;
+
+ RuleType(int value) {
+ this.value = value;
+ }
+
+ public static RuleType fromValue(int value) {
+ if (value == 0) {
+ return EXCLUDE;
+ } else if (value == 1) {
+ return INCLUDE;
+ }
+ throw new IllegalArgumentException("Invalid RuleType Value: " +
value);
+ }
+ }
+
+ public enum RuleLevel {
+ PARTITION, // 0
+ TABLE, // 1
+ DATABASE, // 2
+ CATALOG, // 3
+ GLOBAL, // 4
+ INVALID // 5
+ }
+
+ public static class RulePattern {
+ private final long id;
+ private final String userIdentity;
+ private final String catalog;
+ private final String database;
+ private final String table;
+ private final String partitionPattern;
+ private final RuleType ruleType;
+
+ public RulePattern(long id, String userIdentity, String catalog,
String database,
+ String table, String partitionPattern, RuleType
ruleType) {
+ this.id = id;
+ this.userIdentity = userIdentity;
+ this.catalog = catalog != null ? catalog : "";
+ this.database = database != null ? database : "";
+ this.table = table != null ? table : "";
+ this.partitionPattern = partitionPattern != null ?
partitionPattern : "";
+ this.ruleType = ruleType;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public String getUserIdentity() {
+ return userIdentity;
+ }
+
+ public String getCatalog() {
+ return catalog;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public String getPartitionPattern() {
+ return partitionPattern;
+ }
+
+ public RuleType getRuleType() {
+ return ruleType;
+ }
+ }
+
+ public static class AdmissionRule {
+ private final long id;
+ private final String userIdentity;
+ private final String catalog;
+ private final String database;
+ private final String table;
+ private final String partitionPattern;
+ private final RuleType ruleType;
+ private final boolean enabled;
+ private final long createdTime;
+ private final long updatedTime;
+
+ @JsonCreator
+ public AdmissionRule(
+ @JsonProperty("id") long id,
+ @JsonProperty("user_identity") String userIdentity,
+ @JsonProperty("catalog_name") String catalog,
+ @JsonProperty("database_name") String database,
+ @JsonProperty("table_name") String table,
+ @JsonProperty("partition_pattern") String partitionPattern,
+ @JsonProperty("rule_type") int ruleType,
+ @JsonProperty("enabled") boolean enabled,
+ @JsonProperty("created_time") long createdTime,
+ @JsonProperty("updated_time") long updatedTime) {
+ this.id = id;
+ this.userIdentity = userIdentity != null ? userIdentity : "";
+ this.catalog = catalog != null ? catalog : "";
+ this.database = database != null ? database : "";
+ this.table = table != null ? table : "";
+ this.partitionPattern = partitionPattern != null ?
partitionPattern : "";
+ this.ruleType = RuleType.fromValue(ruleType);
+ this.enabled = enabled;
+ this.createdTime = createdTime;
+ this.updatedTime = updatedTime;
+ }
+
+ public RulePattern toRulePattern() {
+ return new RulePattern(id, userIdentity, catalog, database, table,
partitionPattern, ruleType);
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public String getUserIdentity() {
+ return userIdentity;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public String getCatalog() {
+ return catalog;
+ }
+
+ public String getPartitionPattern() {
+ return partitionPattern;
+ }
+
+ public RuleType getRuleType() {
+ return ruleType;
+ }
+
+ public boolean getEnabled() {
+ return enabled;
+ }
+
+ public long getCreatedTime() {
+ return createdTime;
+ }
+
+ public long getUpdatedTime() {
+ return updatedTime;
+ }
+ }
+
+ public static class RuleLoader {
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ public static List<AdmissionRule> loadRulesFromFile(String filePath)
throws Exception {
+ File file = new File(filePath);
+ if (!file.exists()) {
+ throw new IllegalArgumentException("File cache admission JSON
file does not exist: " + filePath);
+ }
+
+ return MAPPER.readValue(file, new
TypeReference<List<AdmissionRule>>() {});
+ }
+ }
+
+ public static class RuleCollection {
+ private boolean excludeGlobal = false;
+ private final Set<String> excludeCatalogRules = new HashSet<>();
+ private final Map<String, Set<String>> excludeDatabaseRules = new
HashMap<>();
+ private final Map<String, Set<String>> excludeTableRules = new
HashMap<>();
+
+ private boolean includeGlobal = false;
+ private final Set<String> includeCatalogRules = new HashSet<>();
+ private final Map<String, Set<String>> includeDatabaseRules = new
HashMap<>();
+ private final Map<String, Set<String>> includeTableRules = new
HashMap<>();
+
+ public static final String REASON_COMMON_CATALOG_BLACKLIST = "common
catalog-level blacklist rule";
+ public static final String REASON_COMMON_CATALOG_WHITELIST = "common
catalog-level whitelist rule";
+ public static final String REASON_COMMON_DATABASE_BLACKLIST = "common
database-level blacklist rule";
+ public static final String REASON_COMMON_DATABASE_WHITELIST = "common
database-level whitelist rule";
+ public static final String REASON_COMMON_TABLE_BLACKLIST = "common
table-level blacklist rule";
+ public static final String REASON_COMMON_TABLE_WHITELIST = "common
table-level whitelist rule";
+ public static final String REASON_USER_GLOBAL_BLACKLIST = "user
global-level blacklist rule";
+ public static final String REASON_USER_GLOBAL_WHITELIST = "user
global-level whitelist rule";
+ public static final String REASON_USER_CATALOG_BLACKLIST = "user
catalog-level blacklist rule";
+ public static final String REASON_USER_CATALOG_WHITELIST = "user
catalog-level whitelist rule";
+ public static final String REASON_USER_DATABASE_BLACKLIST = "user
database-level blacklist rule";
+ public static final String REASON_USER_DATABASE_WHITELIST = "user
database-level whitelist rule";
+ public static final String REASON_USER_TABLE_BLACKLIST = "user
table-level blacklist rule";
+ public static final String REASON_USER_TABLE_WHITELIST = "user
table-level whitelist rule";
+ public static final String REASON_DEFAULT = "default rule";
+
+ public boolean isAdmittedAtTableLevel(String userIdentity, String
catalog, String database, String table,
+ AtomicReference<String> reason) {
+
+ String catalogDatabase = catalog + "." + database;
+
+ if (containsKeyValue(excludeTableRules, table, catalogDatabase)) {
+ reason.set(REASON_COMMON_TABLE_BLACKLIST);
+ logAdmission(false, userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+ if (containsKeyValue(includeTableRules, table, catalogDatabase)) {
+ reason.set(REASON_COMMON_TABLE_WHITELIST);
+ logAdmission(true, userIdentity, catalog, database, table,
reason.get());
+ return true;
+ }
+ if (containsKeyValue(excludeDatabaseRules, database, catalog)) {
+ reason.set(REASON_COMMON_DATABASE_BLACKLIST);
+ logAdmission(false, userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+ if (containsKeyValue(includeDatabaseRules, database, catalog)) {
+ reason.set(REASON_COMMON_DATABASE_WHITELIST);
+ logAdmission(true, userIdentity, catalog, database, table,
reason.get());
+ return true;
+ }
+ if (excludeCatalogRules.contains(catalog)) {
+ reason.set(REASON_COMMON_CATALOG_BLACKLIST);
+ logAdmission(false, userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+ if (includeCatalogRules.contains(catalog)) {
+ reason.set(REASON_COMMON_CATALOG_WHITELIST);
+ logAdmission(true, userIdentity, catalog, database, table,
reason.get());
+ return true;
+ }
+
+ reason.set(REASON_DEFAULT);
+ logAdmission(false, userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+
+ public boolean isAdmittedAtTableLevel(RuleCollection userCollection,
String userIdentity,
+ String catalog, String database,
String table,
+ AtomicReference<String> reason) {
+
+ String catalogDatabase = catalog + "." + database;
+
+ if (containsKeyValue(excludeTableRules, table, catalogDatabase)) {
+ reason.set(REASON_COMMON_TABLE_BLACKLIST);
+ logAdmission(false, userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+ if (containsKeyValue(userCollection.excludeTableRules, table,
catalogDatabase)) {
+ reason.set(REASON_USER_TABLE_BLACKLIST);
+ logAdmission(false, userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+ if (containsKeyValue(includeTableRules, table, catalogDatabase)) {
+ reason.set(REASON_COMMON_TABLE_WHITELIST);
+ logAdmission(true, userIdentity, catalog, database, table,
reason.get());
+ return true;
+ }
+ if (containsKeyValue(userCollection.includeTableRules, table,
catalogDatabase)) {
+ reason.set(REASON_USER_TABLE_WHITELIST);
+ logAdmission(true, userIdentity, catalog, database, table,
reason.get());
+ return true;
+ }
+ if (containsKeyValue(excludeDatabaseRules, database, catalog)) {
+ reason.set(REASON_COMMON_DATABASE_BLACKLIST);
+ logAdmission(false, userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+ if (containsKeyValue(userCollection.excludeDatabaseRules,
database, catalog)) {
+ reason.set(REASON_USER_DATABASE_BLACKLIST);
+ logAdmission(false, userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+ if (containsKeyValue(includeDatabaseRules, database, catalog)) {
+ reason.set(REASON_COMMON_DATABASE_WHITELIST);
+ logAdmission(true, userIdentity, catalog, database, table,
reason.get());
+ return true;
+ }
+ if (containsKeyValue(userCollection.includeDatabaseRules,
database, catalog)) {
+ reason.set(REASON_USER_DATABASE_WHITELIST);
+ logAdmission(true, userIdentity, catalog, database, table,
reason.get());
+ return true;
+ }
+ if (excludeCatalogRules.contains(catalog)) {
+ reason.set(REASON_COMMON_CATALOG_BLACKLIST);
+ logAdmission(false, userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+ if (userCollection.excludeCatalogRules.contains(catalog)) {
+ reason.set(REASON_USER_CATALOG_BLACKLIST);
+ logAdmission(false, userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+ if (includeCatalogRules.contains(catalog)) {
+ reason.set(REASON_COMMON_CATALOG_WHITELIST);
+ logAdmission(true, userIdentity, catalog, database, table,
reason.get());
+ return true;
+ }
+ if (userCollection.includeCatalogRules.contains(catalog)) {
+ reason.set(REASON_USER_CATALOG_WHITELIST);
+ logAdmission(true, userIdentity, catalog, database, table,
reason.get());
+ return true;
+ }
+ if (userCollection.excludeGlobal) {
+ reason.set(REASON_USER_GLOBAL_BLACKLIST);
+ logAdmission(false, userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+ if (userCollection.includeGlobal) {
+ reason.set(REASON_USER_GLOBAL_WHITELIST);
+ logAdmission(true, userIdentity, catalog, database, table,
reason.get());
+ return true;
+ }
+
+ reason.set(REASON_DEFAULT);
+ logAdmission(false, userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+
+ private boolean containsKeyValue(Map<String, Set<String>> map, String
key, String value) {
+ Set<String> set = map.get(key);
+ return set != null && set.contains(value);
+ }
+
+ private void logAdmission(boolean admitted, String userIdentity,
String catalog, String database,
+ String table, String reason) {
+ if (LOG.isDebugEnabled()) {
+ String status = admitted ? "admitted" : "denied";
+
+ String logMessage = String.format(
+ "File cache request %s by %s, user_identity: %s, "
+ + "catalog: %s, database: %s, table: %s",
+ status, reason, userIdentity, catalog, database,
table);
+
+ LOG.debug(logMessage);
+ }
+ }
+
+ public RuleLevel getRuleLevel(RulePattern rulePattern) {
+ int pattern = 0;
+ if (!rulePattern.getPartitionPattern().isEmpty()) {
+ pattern |= 1;
+ }
+ if (!rulePattern.getTable().isEmpty()) {
+ pattern |= 1 << 1;
+ }
+ if (!rulePattern.getDatabase().isEmpty()) {
+ pattern |= 1 << 2;
+ }
+ if (!rulePattern.getCatalog().isEmpty()) {
+ pattern |= 1 << 3;
+ }
+
+ RuleLevel[] levelTable = {
+ /* 0000 */ RuleLevel.GLOBAL, // 0
+ /* 0001 */ RuleLevel.INVALID, // 1
+ /* 0010 */ RuleLevel.INVALID, // 2
+ /* 0011 */ RuleLevel.INVALID, // 3
+ /* 0100 */ RuleLevel.INVALID, // 4
+ /* 0101 */ RuleLevel.INVALID, // 5
+ /* 0110 */ RuleLevel.INVALID, // 6
+ /* 0111 */ RuleLevel.INVALID, // 7
+ /* 1000 */ RuleLevel.CATALOG, // 8
+ /* 1001 */ RuleLevel.INVALID, // 9
+ /* 1010 */ RuleLevel.INVALID, // 10
+ /* 1011 */ RuleLevel.INVALID, // 11
+ /* 1100 */ RuleLevel.DATABASE, // 12
+ /* 1101 */ RuleLevel.INVALID, // 13
+ /* 1110 */ RuleLevel.TABLE, // 14
+ /* 1111 */ RuleLevel.PARTITION // 15
+ };
+
+ return levelTable[pattern];
+ }
+
+ public void add(RulePattern rulePattern) {
+ RuleLevel ruleLevel = getRuleLevel(rulePattern);
+ if (ruleLevel == RuleLevel.INVALID) {
+ return;
+ }
+
+ Set<String> catalogRules = (rulePattern.getRuleType() ==
RuleType.EXCLUDE)
+ ? excludeCatalogRules : includeCatalogRules;
+ Map<String, Set<String>> databaseRules =
(rulePattern.getRuleType() == RuleType.EXCLUDE)
+ ? excludeDatabaseRules : includeDatabaseRules;
+ Map<String, Set<String>> tableRules = (rulePattern.getRuleType()
== RuleType.EXCLUDE)
+ ? excludeTableRules : includeTableRules;
+
+ switch (ruleLevel) {
+ case GLOBAL:
+ if (rulePattern.getRuleType() == RuleType.EXCLUDE) {
+ excludeGlobal = true;
+ } else {
+ includeGlobal = true;
+ }
+ break;
+ case CATALOG:
+ catalogRules.add(rulePattern.getCatalog());
+ break;
+ case DATABASE:
+ databaseRules.computeIfAbsent(rulePattern.getDatabase(), k
-> new HashSet<>())
+ .add(rulePattern.getCatalog());
+ break;
+ case TABLE:
+ String catalogDatabase = rulePattern.getCatalog() + "." +
rulePattern.getDatabase();
+ tableRules.computeIfAbsent(rulePattern.getTable(), k ->
new HashSet<>())
+ .add(catalogDatabase);
+ break;
+ case PARTITION:
+ // TODO: Implementing partition-level rules
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ // Rule management supporting concurrent reads and writes.
+ // Thread safety is provided by the ReentrantReadWriteLock in
FileCacheAdmissionManager.
+ public static class RuleManager {
+ // Characters in ASCII order: A-Z, then other symbols, then a-z
+ private static final int PARTITION_COUNT = 58;
+ private final List<Map<String, RuleCollection>> maps;
+ private final RuleCollection commonCollection;
+
+ static List<String> otherReasons = new ArrayList<>(Arrays.asList(
+ "empty user_identity",
+ "invalid user_identity"
+ ));
+
+ public RuleManager() {
+ maps = new ArrayList<>(PARTITION_COUNT);
+ commonCollection = new RuleCollection();
+
+ for (int i = 0; i < PARTITION_COUNT; i++) {
+ maps.add(new HashMap<>());
+ }
+ }
+
+ private int getIndex(char firstChar) {
+ return firstChar - 'A';
+ }
+
+ public void initialize(List<AdmissionRule> rules) {
+ for (AdmissionRule rule : rules) {
+ if (!rule.getEnabled()) {
+ continue;
+ }
+
+ RulePattern rulePattern = rule.toRulePattern();
+
+ if (rulePattern.getUserIdentity().isEmpty()) {
+ commonCollection.add(rulePattern);
+ continue;
+ }
+
+ char firstChar = rulePattern.getUserIdentity().charAt(0);
+ if (!((firstChar >= 'A' && firstChar <= 'Z') || (firstChar >=
'a' && firstChar <= 'z'))) {
+ continue;
+ }
+
+ int index = getIndex(firstChar);
+ maps.get(index).computeIfAbsent(rulePattern.getUserIdentity(),
+ k -> new RuleCollection()).add(rulePattern);
+ }
+ }
+
+ public boolean isAdmittedAtTableLevel(String userIdentity, String
catalog, String database, String table,
+ AtomicReference<String> reason) {
+ if (userIdentity.isEmpty()) {
+ reason.set(otherReasons.get(0));
+ logDefaultAdmission(userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+
+ char firstChar = userIdentity.charAt(0);
+ if (!((firstChar >= 'A' && firstChar <= 'Z') || (firstChar >= 'a'
&& firstChar <= 'z'))) {
+ reason.set(otherReasons.get(1));
+ logDefaultAdmission(userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+
+ int index = getIndex(firstChar);
+ RuleCollection collection = maps.get(index).get(userIdentity);
+ if (collection == null) {
+ return commonCollection.isAdmittedAtTableLevel(userIdentity,
catalog, database, table, reason);
+ } else {
+ return commonCollection.isAdmittedAtTableLevel(
+ collection, userIdentity, catalog, database, table,
reason);
+ }
+ }
+
+ private void logDefaultAdmission(String userIdentity, String catalog,
String database, String table,
+ String reason) {
+ if (LOG.isDebugEnabled()) {
+ String decision = "denied";
+
+ String logMessage = String.format(
+ "File cache request %s by default rule, "
+ + "user_identity: %s, catalog: %s, database: %s,
table: %s, reason: %s",
+ decision, userIdentity, catalog, database, table,
reason);
+
+ LOG.debug(logMessage);
+ }
+ }
+ }
+
+ private RuleManager ruleManager;
+
+ private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();
+ private final ReentrantReadWriteLock.WriteLock writeLock =
rwLock.writeLock();
+
+ private static final FileCacheAdmissionManager INSTANCE = new
FileCacheAdmissionManager();
+
+ private ConfigWatcher watcher;
+
+ public FileCacheAdmissionManager() {
+ this.ruleManager = new RuleManager();
+ }
+
+ public static FileCacheAdmissionManager getInstance() {
+ return INSTANCE;
+ }
+
+ public void initialize(List<AdmissionRule> rules) {
+ ruleManager.initialize(rules);
+ }
+
+ public boolean isAdmittedAtTableLevel(String userIdentity, String catalog,
String database, String table,
+ AtomicReference<String> reason) {
+ readLock.lock();
+ try {
+ return ruleManager.isAdmittedAtTableLevel(userIdentity, catalog,
database, table, reason);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public void loadRules(String filePath) {
+ if (filePath == null || filePath.isEmpty()) {
+ LOG.warn("File cache admission JSON file path is not configured,
admission control will be disabled.");
+ return;
+ }
+
+ try {
+ List<AdmissionRule> loadedRules =
RuleLoader.loadRulesFromFile(filePath);
+ LOG.info("{} rules loaded successfully from file: {}",
loadedRules.size(), filePath);
+
+ RuleManager newRuleManager = new RuleManager();
+ newRuleManager.initialize(loadedRules);
+
+ writeLock.lock();
+ try {
+ ruleManager = newRuleManager;
+ } finally {
+ writeLock.unlock();
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to load file cache admission rules from file:
{}", filePath, e);
+ }
+ }
+
+ public void loadRules() {
+ if (Config.file_cache_admission_control_json_dir == null
+ || Config.file_cache_admission_control_json_dir.isEmpty())
{
+ LOG.warn("File cache admission JSON directory is not configured,
admission control will be disabled.");
+ return;
+ }
+
+ try {
+ File ruleDir = new
File(Config.file_cache_admission_control_json_dir);
+
+ if (!ruleDir.exists()) {
+ LOG.warn("File cache admission JSON directory does not exist:
{}",
+ Config.file_cache_admission_control_json_dir);
+ return;
+ }
+
+ if (!ruleDir.isDirectory()) {
+ LOG.error("File cache admission JSON directory is not a
directory: {}",
+ Config.file_cache_admission_control_json_dir);
+ return;
+ }
+
+ File[] jsonFiles = ruleDir.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.toLowerCase().endsWith(".json");
+ }
+ });
+
+ if (jsonFiles == null) {
+ LOG.error("Failed to list JSON files in directory: {}",
+ Config.file_cache_admission_control_json_dir);
+ return;
+ }
+
+ LOG.info("Found {} JSON files in admission rule directory: {}",
+ jsonFiles.length,
Config.file_cache_admission_control_json_dir);
+
+ List<AdmissionRule> allRules = new ArrayList<>();
+
+ // Duplicate rule handling: only rules with `enabled=true` are
stored.
+ // A rule is considered duplicate if its `userIdentity`,
`catalog`, `database`, `table`,
+ // `partitionPattern`, and 'ruleType' all match another rule,
regardless of their `enabled` flag.
+ // Duplicate enabled rules are automatically deduplicated during
processing.
+ for (File jsonFile : jsonFiles) {
+ List<AdmissionRule> loadedRules =
RuleLoader.loadRulesFromFile(jsonFile.getPath());
+ LOG.info("{} rules loaded successfully from JSON file: {}",
loadedRules.size(),
+ jsonFile.getPath());
+
+ allRules.addAll(loadedRules);
+ }
+
+ RuleManager newRuleManager = new RuleManager();
+ newRuleManager.initialize(allRules);
+
+ writeLock.lock();
+ try {
+ ruleManager = newRuleManager;
+ } finally {
+ writeLock.unlock();
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to load file cache admission rules from
directory: {}",
+ Config.file_cache_admission_control_json_dir, e);
+ }
+ }
+
+ // Reloads all JSON rules and replaces the RuleManager
+ // when any .json file is created, modified, or deleted.
+ public void loadOnStartup() {
+ if (Config.file_cache_admission_control_json_dir == null
+ || Config.file_cache_admission_control_json_dir.isEmpty()) {
+ LOG.warn("File cache admission JSON directory is not configured,
skip loading.");
+ return;
+ }
+
+ File ruleDir = new File(Config.file_cache_admission_control_json_dir);
+ if (!ruleDir.exists() || !ruleDir.isDirectory()) {
+ LOG.warn("File cache admission JSON directory does not exist or is
not a directory: {}, skip loading.",
+ Config.file_cache_admission_control_json_dir);
+ return;
+ }
+
+ LOG.info("Loading file cache admission rules...");
+ loadRules();
+
+ LOG.info("Starting file cache admission rules refreshing task");
+ watcher = new
ConfigWatcher(Config.file_cache_admission_control_json_dir);
+ watcher.setOnCreateConsumer(filePath -> {
+ String fileName = filePath.toString();
+ if (fileName.endsWith(".json")) {
+ loadRules();
+ }
+ });
+ watcher.setOnDeleteConsumer(filePath -> {
+ String fileName = filePath.toString();
+ if (fileName.endsWith(".json")) {
+ loadRules();
+ }
+ });
+ watcher.setOnModifyConsumer(filePath -> {
+ String fileName = filePath.toString();
+ if (fileName.endsWith(".json")) {
+ loadRules();
+ }
+ });
+ watcher.start();
+
+ LOG.info("Started file cache admission rules refreshing task");
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index 4658c2c4dfc..a38305b7ea5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -22,13 +22,16 @@ import org.apache.doris.analysis.TableSample;
import org.apache.doris.analysis.TableScanParams;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.catalog.FunctionGenTable;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.common.NotImplementedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
@@ -68,12 +71,16 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.net.URI;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
/**
* FileQueryScanNode for querying the file access type of catalog, now only
support
@@ -99,6 +106,12 @@ public abstract class FileQueryScanNode extends
FileScanNode {
protected FileSplitter fileSplitter;
+ // The data cache function only works for queries on Hive, Iceberg,
Hudi(via HMS), and Paimon tables.
+ // See: https://doris.incubator.apache.org/docs/dev/lakehouse/data-cache
+ private static final Set<String> CACHEABLE_CATALOGS = new HashSet<>(
+ Arrays.asList("hms", "iceberg", "paimon")
+ );
+
/**
* External file scan node for Query hms table
* needCheckColumnPriv: Some of ExternalFileScanNode do not need to check
column priv
@@ -317,11 +330,18 @@ public abstract class FileQueryScanNode extends
FileScanNode {
int numBackends = backendPolicy.numBackends();
List<String> pathPartitionKeys = getPathPartitionKeys();
+
+ boolean admissionResult = true;
+ if (ConnectContext.get().getSessionVariable().isEnableFileCache()
+ && Config.enable_file_cache_admission_control) {
+ admissionResult = fileCacheAdmissionCheck();
+ }
+
if (isBatchMode()) {
// File splits are generated lazily, and fetched by backends while
scanning.
// Only provide the unique ID of split source to backend.
- splitAssignment = new SplitAssignment(
- backendPolicy, this, this::splitToScanRange,
locationProperties, pathPartitionKeys);
+ splitAssignment = new SplitAssignment(backendPolicy, this,
this::splitToScanRange,
+ locationProperties, pathPartitionKeys, admissionResult);
splitAssignment.init();
if (executor != null) {
executor.getSummaryProfile().setGetSplitsFinishTime();
@@ -375,7 +395,8 @@ public abstract class FileQueryScanNode extends
FileScanNode {
for (Backend backend : assignment.keySet()) {
Collection<Split> splits = assignment.get(backend);
for (Split split : splits) {
- scanRangeLocations.add(splitToScanRange(backend,
locationProperties, split, pathPartitionKeys));
+ scanRangeLocations.add(splitToScanRange(backend,
locationProperties, split, pathPartitionKeys,
+ admissionResult));
totalFileSize += split.getLength();
}
scanBackendIds.add(backend.getId());
@@ -400,7 +421,8 @@ public abstract class FileQueryScanNode extends
FileScanNode {
Backend backend,
Map<String, String> locationProperties,
Split split,
- List<String> pathPartitionKeys) throws UserException {
+ List<String> pathPartitionKeys,
+ boolean admissionResult) throws UserException {
FileSplit fileSplit = (FileSplit) split;
TScanRangeLocations curLocations = newLocations();
// If fileSplit has partition values, use the values collected from
hive partitions.
@@ -420,6 +442,7 @@ public abstract class FileQueryScanNode extends
FileScanNode {
// set file format type, and the type might fall back to native format
in setScanParams
rangeDesc.setFormatType(getFileFormatType());
setScanParams(rangeDesc, fileSplit);
+ rangeDesc.setFileCacheAdmission(admissionResult);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
TScanRangeLocation location = new TScanRangeLocation();
@@ -657,6 +680,55 @@ public abstract class FileQueryScanNode extends
FileScanNode {
return this.scanParams;
}
+ protected boolean fileCacheAdmissionCheck() throws UserException {
+ boolean admissionResultAtTableLevel = true;
+ TableIf tableIf = getTargetTable();
+ String table = tableIf.getName();
+
+ if (tableIf instanceof ExternalTable) {
+ ExternalTable externalTableIf = (ExternalTable) tableIf;
+ String database = tableIf.getDatabase().getFullName();
+ String catalog = externalTableIf.getCatalog().getName();
+
+ if
(CACHEABLE_CATALOGS.contains(externalTableIf.getCatalog().getType())) {
+ UserIdentity currentUser =
ConnectContext.get().getCurrentUserIdentity();
+ String userIdentity = currentUser.getQualifiedUser() + "@" +
currentUser.getHost();
+
+ AtomicReference<String> reason = new AtomicReference<>("");
+
+ long startTime = System.nanoTime();
+
+ admissionResultAtTableLevel =
FileCacheAdmissionManager.getInstance().isAdmittedAtTableLevel(
+ userIdentity, catalog, database, table, reason);
+
+ long endTime = System.nanoTime();
+ double durationMs = (double) (endTime - startTime) / 1_000_000;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("File cache admission control cost {} ms",
String.format("%.6f", durationMs));
+ }
+
+ addFileCacheAdmissionLog(userIdentity,
admissionResultAtTableLevel, reason.get(), durationMs);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skip file cache admission control for
non-cacheable table: {}.{}.{}",
+ catalog, database, table);
+ }
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ DatabaseIf databaseIf = tableIf.getDatabase();
+ String database = databaseIf == null ? "null" :
databaseIf.getFullName();
+ String catalog = databaseIf == null || databaseIf.getCatalog()
== null
+ ? "null" : databaseIf.getCatalog().getName();
+ LOG.debug("Skip file cache admission control for non-external
table: {}.{}.{}",
+ catalog, database, table);
+ }
+ }
+
+ return admissionResultAtTableLevel;
+ }
+
protected long applyMaxFileSplitNumLimit(long targetSplitSize, long
totalFileSize) {
int maxFileSplitNum = sessionVariable.getMaxFileSplitNum();
if (maxFileSplitNum <= 0 || totalFileSize <= 0) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
index cbf2420cead..ca64c7457e6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
@@ -54,6 +54,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
@@ -72,6 +73,8 @@ public abstract class FileScanNode extends ExternalScanNode {
// For display pushdown agg result
protected long tableLevelRowCount = -1;
+ protected List<String> fileCacheAdmissionLogs;
+
public FileScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName,
ScanContext scanContext, boolean needCheckColumnPriv) {
this(id, desc, planNodeName, StatisticalType.DEFAULT, scanContext,
needCheckColumnPriv);
@@ -81,6 +84,7 @@ public abstract class FileScanNode extends ExternalScanNode {
StatisticalType statisticalType, ScanContext scanContext, boolean
needCheckColumnPriv) {
super(id, desc, planNodeName, statisticalType, scanContext,
needCheckColumnPriv);
this.needCheckColumnPriv = needCheckColumnPriv;
+ this.fileCacheAdmissionLogs = new ArrayList<>();
}
@Override
@@ -231,6 +235,11 @@ public abstract class FileScanNode extends
ExternalScanNode {
.map(node -> node.getId().asInt() +
"").collect(Collectors.toList()));
output.append(prefix).append("TOPN
OPT:").append(topnFilterSources).append("\n");
}
+
+ for (String admissionLog : fileCacheAdmissionLogs) {
+ output.append(prefix).append(admissionLog).append("\n");
+ }
+
return output.toString();
}
@@ -301,4 +310,11 @@ public abstract class FileScanNode extends
ExternalScanNode {
}
}
}
+
+ protected void addFileCacheAdmissionLog(String userIdentity, Boolean
admitted, String reason, double durationMs) {
+ String admissionStatus = admitted ? "ADMITTED" : "DENIED";
+ String admissionLog = String.format("file cache request %s:
user_identity:%s, reason:%s, cost:%.6f ms",
+ admissionStatus, userIdentity, reason, durationMs);
+ fileCacheAdmissionLogs.add(admissionLog);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
index cc17818d6b5..5f79a006a7a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
@@ -53,6 +53,7 @@ public class SplitAssignment {
private final SplitToScanRange splitToScanRange;
private final Map<String, String> locationProperties;
private final List<String> pathPartitionKeys;
+ private final boolean fileCacheAdmission;
private final Object assignLock = new Object();
private Split sampleSplit = null;
private final AtomicBoolean isStopped = new AtomicBoolean(false);
@@ -66,12 +67,14 @@ public class SplitAssignment {
SplitGenerator splitGenerator,
SplitToScanRange splitToScanRange,
Map<String, String> locationProperties,
- List<String> pathPartitionKeys) {
+ List<String> pathPartitionKeys,
+ boolean fileCacheAdmission) {
this.backendPolicy = backendPolicy;
this.splitGenerator = splitGenerator;
this.splitToScanRange = splitToScanRange;
this.locationProperties = locationProperties;
this.pathPartitionKeys = pathPartitionKeys;
+ this.fileCacheAdmission = fileCacheAdmission;
}
public void init() throws UserException {
@@ -107,7 +110,8 @@ public class SplitAssignment {
Collection<Split> splits = batch.get(backend);
List<TScanRangeLocations> locations = new
ArrayList<>(splits.size());
for (Split split : splits) {
- locations.add(splitToScanRange.getScanRange(backend,
locationProperties, split, pathPartitionKeys));
+ locations.add(splitToScanRange.getScanRange(backend,
locationProperties, split, pathPartitionKeys,
+ fileCacheAdmission));
}
while (needMoreSplit()) {
BlockingQueue<Collection<TScanRangeLocations>> queue =
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitToScanRange.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitToScanRange.java
index 0e890252857..bea93e99adc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitToScanRange.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitToScanRange.java
@@ -30,5 +30,6 @@ public interface SplitToScanRange {
Backend backend,
Map<String, String> locationProperties,
Split split,
- List<String> pathPartitionKeys) throws UserException;
+ List<String> pathPartitionKeys,
+ boolean fileCacheAdmission) throws UserException;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index dde35800b85..a5acc362365 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -61,6 +61,7 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PropertyAnalyzer;
+import org.apache.doris.datasource.FileCacheAdmissionManager;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.dictionary.LayoutType;
import org.apache.doris.info.PartitionNamesInfo;
@@ -1115,6 +1116,8 @@ import org.antlr.v4.runtime.Token;
import org.antlr.v4.runtime.tree.ParseTree;
import org.antlr.v4.runtime.tree.RuleNode;
import org.antlr.v4.runtime.tree.TerminalNode;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.math.BigDecimal;
import java.math.BigInteger;
@@ -1130,6 +1133,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -1138,6 +1142,8 @@ import java.util.stream.Collectors;
*/
@SuppressWarnings({"OptionalUsedAsFieldOrParameterType",
"OptionalGetWithoutIsPresent"})
public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
+ private static final Logger LOG =
LogManager.getLogger(LogicalPlanBuilder.class);
+
private static String JOB_NAME = "jobName";
private static String TASK_ID = "taskId";
@@ -9317,6 +9323,47 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
return new WarmUpClusterCommand(warmUpItems, srcCluster, dstCluster,
isForce, isWarmUpWithTable, properties);
}
+ void fileCacheAdmissionCheck(DorisParser.WarmUpSelectContext ctx) {
+ DorisParser.WarmUpSingleTableRefContext tableRef =
ctx.warmUpSingleTableRef();
+ List<String> identifierParts =
visitMultipartIdentifier(tableRef.multipartIdentifier());
+
+ int partCount = identifierParts.size();
+ String table = identifierParts.get(partCount - 1);
+ String database = (partCount >= 2)
+ ? identifierParts.get(partCount - 2) :
ConnectContext.get().getDatabase();
+ String catalog = (partCount == 3)
+ ? identifierParts.get(partCount - 3) :
ConnectContext.get().getCurrentCatalog().getName();
+
+ UserIdentity currentUser =
ConnectContext.get().getCurrentUserIdentity();
+ String userIdentity = currentUser.getQualifiedUser() + "@" +
currentUser.getHost();
+
+ if (!"internal".equals(catalog)) {
+ AtomicReference<String> reason = new AtomicReference<>("");
+
+ long startTime = System.nanoTime();
+
+ boolean admissionResultAtTableLevel =
FileCacheAdmissionManager.getInstance().isAdmittedAtTableLevel(
+ userIdentity, catalog, database, table, reason);
+
+ long endTime = System.nanoTime();
+ double durationMs = (double) (endTime - startTime) / 1_000_000;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("File cache admission control cost {} ms",
String.format("%.6f", durationMs));
+ }
+
+ if (!admissionResultAtTableLevel) {
+ throw new AnalysisException("WARM UP SELECT denied by file
cache admission control, reason: "
+ + reason);
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skip file cache admission control for non-external
table: {}.{}",
+ database, table);
+ }
+ }
+ }
+
@Override
public LogicalPlan visitWarmUpSelect(DorisParser.WarmUpSelectContext ctx) {
LogicalPlan relation =
visitWarmUpSingleTableRef(ctx.warmUpSingleTableRef());
@@ -9344,6 +9391,10 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
+ " disable_file_cache=false in cloud mode");
}
+ if (Config.enable_file_cache_admission_control) {
+ fileCacheAdmissionCheck(ctx);
+ }
+
UnboundBlackholeSink<?> sink = new UnboundBlackholeSink<>(project,
new UnboundBlackholeSinkContext(true));
LogicalPlan command = new WarmupSelectCommand(sink);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index cd26b28bcbd..38e4059711c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -740,4 +740,8 @@ public abstract class ScanNode extends PlanNode implements
SplitGenerator {
public long getCatalogId() {
return Env.getCurrentInternalCatalog().getId();
}
+
+ protected boolean fileCacheAdmissionCheck() throws UserException {
+ return true;
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/FileCacheAdmissionManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileCacheAdmissionManagerTest.java
new file mode 100644
index 00000000000..383c526ccaa
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileCacheAdmissionManagerTest.java
@@ -0,0 +1,437 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class FileCacheAdmissionManagerTest {
+
+ private FileCacheAdmissionManager manager;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void setUp() {
+ manager = new FileCacheAdmissionManager();
+ }
+
+ @Test
+ public void testEmptyUserIdentity() {
+ AtomicReference<String> reason = new AtomicReference<>();
+ boolean result = manager.isAdmittedAtTableLevel("", "catalog",
"database", "table", reason);
+ Assert.assertFalse(result);
+ Assert.assertEquals("empty user_identity", reason.get());
+ }
+
+ @Test
+ public void testInvalidUserIdentity() {
+ AtomicReference<String> reason = new AtomicReference<>();
+ boolean result = manager.isAdmittedAtTableLevel("123user", "catalog",
"database", "table", reason);
+ Assert.assertFalse(result);
+ Assert.assertEquals("invalid user_identity", reason.get());
+ }
+
+ @Test
+ public void testCommonRule() throws Exception {
+ List<FileCacheAdmissionManager.AdmissionRule> rules = new
ArrayList<>();
+ long createdTime = 0;
+ long updatedTime = 0;
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 1L, "", "catalog_1", "", "", "",
+ 1, true, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 2L, "", "catalog_2", "database_1", "", "",
+ 1, true, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 3L, "", "catalog_3", "database_2", "table_1", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
+ File jsonFile = tempFolder.newFile("rules-test-common.json");
+ objectMapper.writeValue(jsonFile, rules);
+
+ manager.loadRules(jsonFile.getAbsolutePath());
+
+ AtomicReference<String> reason1 = new AtomicReference<>();
+ boolean result1 = manager.isAdmittedAtTableLevel("user", "catalog_1",
"database", "table", reason1);
+ Assert.assertTrue(result1);
+ Assert.assertEquals("common catalog-level whitelist rule",
reason1.get());
+
+ AtomicReference<String> reason2 = new AtomicReference<>();
+ boolean result2 = manager.isAdmittedAtTableLevel("user", "catalog_2",
"database_1", "table", reason2);
+ Assert.assertTrue(result2);
+ Assert.assertEquals("common database-level whitelist rule",
reason2.get());
+
+ AtomicReference<String> reason3 = new AtomicReference<>();
+ boolean result3 = manager.isAdmittedAtTableLevel("user", "catalog_3",
"database_2", "table_1", reason3);
+ Assert.assertTrue(result3);
+ Assert.assertEquals("common table-level whitelist rule",
reason3.get());
+ }
+
+ @Test
+ public void testRuleEnabled() throws Exception {
+ List<FileCacheAdmissionManager.AdmissionRule> rules = new
ArrayList<>();
+ long createdTime = 0;
+ long updatedTime = 0;
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 1L, "", "catalog_1", "", "", "",
+ 1, false, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 2L, "", "catalog_2", "database_1", "", "",
+ 1, false, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 3L, "", "catalog_3", "database_2", "table_1", "",
+ 1, false, createdTime, updatedTime
+ ));
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
+ File jsonFile = tempFolder.newFile("rules-test-enabled.json");
+ objectMapper.writeValue(jsonFile, rules);
+
+ manager.loadRules(jsonFile.getAbsolutePath());
+
+ AtomicReference<String> reason1 = new AtomicReference<>();
+ boolean result1 = manager.isAdmittedAtTableLevel("user", "catalog_1",
"database", "table", reason1);
+ Assert.assertFalse(result1);
+ Assert.assertEquals("default rule", reason1.get());
+
+ AtomicReference<String> reason2 = new AtomicReference<>();
+ boolean result2 = manager.isAdmittedAtTableLevel("user", "catalog_2",
"database_1", "table", reason2);
+ Assert.assertFalse(result2);
+ Assert.assertEquals("default rule", reason2.get());
+
+ AtomicReference<String> reason3 = new AtomicReference<>();
+ boolean result3 = manager.isAdmittedAtTableLevel("user", "catalog_3",
"database_2", "table_1", reason3);
+ Assert.assertFalse(result3);
+ Assert.assertEquals("default rule", reason3.get());
+ }
+
+ @Test
+ public void testUserRule() throws Exception {
+ List<FileCacheAdmissionManager.AdmissionRule> rules = new
ArrayList<>();
+ long createdTime = 0;
+ long updatedTime = 0;
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 4L, "user_1", "catalog_4", "", "", "",
+ 1, true, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 5L, "user_1", "catalog_5", "database_4", "", "",
+ 1, true, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 6L, "user_1", "catalog_6", "database_5", "table_4", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
+ File jsonFile = tempFolder.newFile("rules-test-user.json");
+ objectMapper.writeValue(jsonFile, rules);
+
+ manager.loadRules(jsonFile.getAbsolutePath());
+
+ AtomicReference<String> reason1 = new AtomicReference<>();
+ boolean result1 = manager.isAdmittedAtTableLevel("user_1",
"catalog_4", "database", "table", reason1);
+ Assert.assertTrue(result1);
+ Assert.assertEquals("user catalog-level whitelist rule",
reason1.get());
+ AtomicReference<String> reason2 = new AtomicReference<>();
+ boolean result2 = manager.isAdmittedAtTableLevel("user_2",
"catalog_4", "database", "table", reason2);
+ Assert.assertFalse(result2);
+ Assert.assertEquals("default rule", reason2.get());
+
+ AtomicReference<String> reason3 = new AtomicReference<>();
+ boolean result3 = manager.isAdmittedAtTableLevel("user_1",
"catalog_5", "database_4", "table", reason3);
+ Assert.assertTrue(result3);
+ Assert.assertEquals("user database-level whitelist rule",
reason3.get());
+ AtomicReference<String> reason4 = new AtomicReference<>();
+ boolean result4 = manager.isAdmittedAtTableLevel("user_2",
"catalog_5", "database_4", "table", reason4);
+ Assert.assertFalse(result4);
+ Assert.assertEquals("default rule", reason4.get());
+
+ AtomicReference<String> reason5 = new AtomicReference<>();
+ boolean result5 = manager.isAdmittedAtTableLevel("user_1",
"catalog_6", "database_5", "table_4", reason5);
+ Assert.assertTrue(result5);
+ Assert.assertEquals("user table-level whitelist rule", reason5.get());
+ AtomicReference<String> reason6 = new AtomicReference<>();
+ boolean result6 = manager.isAdmittedAtTableLevel("user_2",
"catalog_6", "database_5", "table_4", reason6);
+ Assert.assertFalse(result6);
+ Assert.assertEquals("default rule", reason6.get());
+ }
+
+ @Test
+ public void testRuleLevelPriority() throws Exception {
+ List<FileCacheAdmissionManager.AdmissionRule> rules = new
ArrayList<>();
+ long createdTime = 0;
+ long updatedTime = 0;
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 7L, "user_3", "", "", "", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
+ File jsonFile = tempFolder.newFile("rules-test-priority.json");
+ objectMapper.writeValue(jsonFile, rules);
+
+ manager.loadRules(jsonFile.getAbsolutePath());
+
+ AtomicReference<String> reason1 = new AtomicReference<>();
+ boolean result1 = manager.isAdmittedAtTableLevel("user_3", "catalog",
"database", "table", reason1);
+ Assert.assertTrue(result1);
+ Assert.assertEquals("user global-level whitelist rule", reason1.get());
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 8L, "user_3", "catalog", "", "", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ objectMapper.writeValue(jsonFile, rules);
+
+ manager.loadRules(jsonFile.getAbsolutePath());
+
+ AtomicReference<String> reason2 = new AtomicReference<>();
+ boolean result2 = manager.isAdmittedAtTableLevel("user_3", "catalog",
"database", "table", reason2);
+ Assert.assertTrue(result2);
+ Assert.assertEquals("user catalog-level whitelist rule",
reason2.get());
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 9L, "user_3", "catalog", "database", "", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ objectMapper.writeValue(jsonFile, rules);
+
+ manager.loadRules(jsonFile.getAbsolutePath());
+
+ AtomicReference<String> reason3 = new AtomicReference<>();
+ boolean result3 = manager.isAdmittedAtTableLevel("user_3", "catalog",
"database", "table", reason3);
+ Assert.assertTrue(result3);
+ Assert.assertEquals("user database-level whitelist rule",
reason3.get());
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 10L, "user_3", "catalog", "database", "table", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ objectMapper.writeValue(jsonFile, rules);
+
+ manager.loadRules(jsonFile.getAbsolutePath());
+
+ AtomicReference<String> reason4 = new AtomicReference<>();
+ boolean result4 = manager.isAdmittedAtTableLevel("user_3", "catalog",
"database", "table", reason4);
+ Assert.assertTrue(result4);
+ Assert.assertEquals("user table-level whitelist rule", reason4.get());
+ }
+
+ @Test
+ public void testRuleTypePriority() throws Exception {
+ List<FileCacheAdmissionManager.AdmissionRule> rules = new
ArrayList<>();
+ long createdTime = 0;
+ long updatedTime = 0;
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 11L, "user_4", "", "", "", "",
+ 0, true, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 12L, "user_4", "", "", "", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
+ File jsonFile = tempFolder.newFile("rules-test-type-priority.json");
+ objectMapper.writeValue(jsonFile, rules);
+
+ manager.loadRules(jsonFile.getAbsolutePath());
+
+ AtomicReference<String> reason1 = new AtomicReference<>();
+ boolean result1 = manager.isAdmittedAtTableLevel("user_4", "catalog",
"database", "table", reason1);
+ Assert.assertFalse(result1);
+ Assert.assertEquals("user global-level blacklist rule", reason1.get());
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 13L, "user_4", "catalog", "", "", "",
+ 0, true, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 14L, "user_4", "catalog", "", "", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ objectMapper.writeValue(jsonFile, rules);
+
+ manager.loadRules(jsonFile.getAbsolutePath());
+
+ AtomicReference<String> reason2 = new AtomicReference<>();
+ boolean result2 = manager.isAdmittedAtTableLevel("user_4", "catalog",
"database", "table", reason2);
+ Assert.assertFalse(result2);
+ Assert.assertEquals("user catalog-level blacklist rule",
reason2.get());
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 15L, "user_4", "catalog", "database", "", "",
+ 0, true, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 16L, "user_4", "catalog", "database", "", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ objectMapper.writeValue(jsonFile, rules);
+
+ manager.loadRules(jsonFile.getAbsolutePath());
+
+ AtomicReference<String> reason3 = new AtomicReference<>();
+ boolean result3 = manager.isAdmittedAtTableLevel("user_4", "catalog",
"database", "table", reason3);
+ Assert.assertFalse(result3);
+ Assert.assertEquals("user database-level blacklist rule",
reason3.get());
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 17L, "user_4", "catalog", "database", "table", "",
+ 0, true, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 18L, "user_4", "catalog", "database", "table", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ objectMapper.writeValue(jsonFile, rules);
+
+ manager.loadRules(jsonFile.getAbsolutePath());
+
+ AtomicReference<String> reason4 = new AtomicReference<>();
+ boolean result4 = manager.isAdmittedAtTableLevel("user_4", "catalog",
"database", "table", reason4);
+ Assert.assertFalse(result4);
+ Assert.assertEquals("user table-level blacklist rule", reason4.get());
+ }
+
+ @Test
+ public void testNestedRulePriorities() throws Exception {
+ List<FileCacheAdmissionManager.AdmissionRule> rules = new
ArrayList<>();
+ long createdTime = 0;
+ long updatedTime = 0;
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 19L, "user_5", "catalog", "", "", "",
+ 0, true, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 20L, "user_5", "catalog", "database", "", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 21L, "user_6", "catalog", "", "", "",
+ 1, true, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 22L, "user_6", "catalog", "database", "", "",
+ 0, true, createdTime, updatedTime
+ ));
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 23L, "user_7", "catalog", "database", "", "",
+ 0, true, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 24L, "user_7", "catalog", "database", "table", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 25L, "user_8", "catalog", "database", "", "",
+ 1, true, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 26L, "user_8", "catalog", "database", "table", "",
+ 0, true, createdTime, updatedTime
+ ));
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
+ File jsonFile = tempFolder.newFile("rules-test-nested.json");
+ objectMapper.writeValue(jsonFile, rules);
+
+ manager.loadRules(jsonFile.getAbsolutePath());
+
+ AtomicReference<String> reason1 = new AtomicReference<>();
+ boolean result1 = manager.isAdmittedAtTableLevel("user_5", "catalog",
"database", "table", reason1);
+ Assert.assertTrue(result1);
+ Assert.assertEquals("user database-level whitelist rule",
reason1.get());
+ AtomicReference<String> reason2 = new AtomicReference<>();
+ boolean result2 = manager.isAdmittedAtTableLevel("user_5", "catalog",
"otherDatabase", "table", reason2);
+ Assert.assertFalse(result2);
+ Assert.assertEquals("user catalog-level blacklist rule",
reason2.get());
+
+ AtomicReference<String> reason3 = new AtomicReference<>();
+ boolean result3 = manager.isAdmittedAtTableLevel("user_6", "catalog",
"database", "table", reason3);
+ Assert.assertFalse(result3);
+ Assert.assertEquals("user database-level blacklist rule",
reason3.get());
+ AtomicReference<String> reason4 = new AtomicReference<>();
+ boolean result4 = manager.isAdmittedAtTableLevel("user_6", "catalog",
"otherDatabase", "table", reason4);
+ Assert.assertTrue(result4);
+ Assert.assertEquals("user catalog-level whitelist rule",
reason4.get());
+
+ AtomicReference<String> reason5 = new AtomicReference<>();
+ boolean result5 = manager.isAdmittedAtTableLevel("user_7", "catalog",
"database", "table", reason5);
+ Assert.assertTrue(result5);
+ Assert.assertEquals("user table-level whitelist rule", reason5.get());
+ AtomicReference<String> reason6 = new AtomicReference<>();
+ boolean result6 = manager.isAdmittedAtTableLevel("user_7", "catalog",
"database", "otherTable", reason6);
+ Assert.assertFalse(result6);
+ Assert.assertEquals("user database-level blacklist rule",
reason6.get());
+
+ AtomicReference<String> reason7 = new AtomicReference<>();
+ boolean result7 = manager.isAdmittedAtTableLevel("user_8", "catalog",
"database", "table", reason7);
+ Assert.assertFalse(result7);
+ Assert.assertEquals("user table-level blacklist rule", reason7.get());
+ AtomicReference<String> reason8 = new AtomicReference<>();
+ boolean result8 = manager.isAdmittedAtTableLevel("user_8", "catalog",
"database", "otherTable", reason8);
+ Assert.assertTrue(result8);
+ Assert.assertEquals("user database-level whitelist rule",
reason8.get());
+ }
+
+ @AfterClass
+ public static void deleteJsonFile() throws Exception {
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/FileCacheAdmissionRuleRefresherTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileCacheAdmissionRuleRefresherTest.java
new file mode 100644
index 00000000000..a5325a36f8c
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileCacheAdmissionRuleRefresherTest.java
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.Config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import org.awaitility.Awaitility;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class FileCacheAdmissionRuleRefresherTest {
+
+ private static FileCacheAdmissionManager manager;
+
+ @BeforeClass
+ public static void setUpClass() throws Exception {
+ Path currentDir = Paths.get("").toAbsolutePath();
+ Path jsonFileDir = currentDir.resolve("jsonFileDir-test");
+
+ if (!Files.exists(jsonFileDir)) {
+ Files.createDirectories(jsonFileDir);
+ System.out.println("Directory created successfully: " +
jsonFileDir.toAbsolutePath());
+ }
+
+ Config.file_cache_admission_control_json_dir = jsonFileDir.toString();
+
+ manager = new FileCacheAdmissionManager();
+ manager.loadOnStartup();
+ }
+
+ @Test
+ public void testJsonFileCreated() throws Exception {
+ AtomicReference<String> reason1 = new AtomicReference<>();
+ boolean result1 = manager.isAdmittedAtTableLevel("user_1",
"catalog_1", "database_1", "table_1", reason1);
+ Assert.assertFalse(result1);
+ Assert.assertEquals("default rule", reason1.get());
+
+ AtomicReference<String> reason2 = new AtomicReference<>();
+ boolean result2 = manager.isAdmittedAtTableLevel("user_2",
"catalog_2", "database_2", "table_2", reason2);
+ Assert.assertFalse(result2);
+ Assert.assertEquals("default rule", reason2.get());
+
+ List<FileCacheAdmissionManager.AdmissionRule> rules = new
ArrayList<>();
+ long createdTime = 0;
+ long updatedTime = 0;
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 1L, "user_1", "catalog_1", "database_1", "table_1", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ File jsonFile1 = new
File(Config.file_cache_admission_control_json_dir, "rules_1.json");
+ objectMapper.writeValue(jsonFile1, rules);
+
+ rules.clear();
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 2L, "user_2", "catalog_2", "database_2", "table_2", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ File jsonFile2 = new
File(Config.file_cache_admission_control_json_dir, "rules_2.json");
+ objectMapper.writeValue(jsonFile2, rules);
+
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ AtomicReference<String> reason3 = new AtomicReference<>();
+ boolean result3 = manager.isAdmittedAtTableLevel("user_1",
"catalog_1", "database_1", "table_1", reason3);
+ Assert.assertTrue(result3);
+ Assert.assertEquals("user table-level whitelist rule",
reason3.get());
+
+ AtomicReference<String> reason4 = new AtomicReference<>();
+ boolean result4 = manager.isAdmittedAtTableLevel("user_2",
"catalog_2", "database_2", "table_2", reason4);
+ Assert.assertTrue(result4);
+ Assert.assertEquals("user table-level whitelist rule",
reason4.get());
+ });
+ }
+
+ @Test
+ public void testJsonFileDeleted() throws Exception {
+ List<FileCacheAdmissionManager.AdmissionRule> rules = new
ArrayList<>();
+ long createdTime = 0;
+ long updatedTime = 0;
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 3L, "user_3", "catalog_3", "database_3", "table_3", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ File jsonFile3 = new
File(Config.file_cache_admission_control_json_dir, "rules_3.json");
+ objectMapper.writeValue(jsonFile3, rules);
+
+ rules.clear();
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 4L, "user_4", "catalog_4", "database_4", "table_4", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ File jsonFile4 = new
File(Config.file_cache_admission_control_json_dir, "rules_4.json");
+ objectMapper.writeValue(jsonFile4, rules);
+
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ AtomicReference<String> reason1 = new AtomicReference<>();
+ boolean result1 = manager.isAdmittedAtTableLevel("user_3",
"catalog_3", "database_3", "table_3", reason1);
+ Assert.assertTrue(result1);
+ Assert.assertEquals("user table-level whitelist rule",
reason1.get());
+
+ AtomicReference<String> reason2 = new AtomicReference<>();
+ boolean result2 = manager.isAdmittedAtTableLevel("user_4",
"catalog_4", "database_4", "table_4", reason2);
+ Assert.assertTrue(result2);
+ Assert.assertEquals("user table-level whitelist rule",
reason2.get());
+ });
+
+ Assert.assertTrue(jsonFile4.delete());
+
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ AtomicReference<String> reason3 = new AtomicReference<>();
+ boolean result3 = manager.isAdmittedAtTableLevel("user_3",
"catalog_3", "database_3", "table_3", reason3);
+ Assert.assertTrue(result3);
+ Assert.assertEquals("user table-level whitelist rule",
reason3.get());
+
+ AtomicReference<String> reason4 = new AtomicReference<>();
+ boolean result4 = manager.isAdmittedAtTableLevel("user_4",
"catalog_4", "database_4", "table_4", reason4);
+ Assert.assertFalse(result4);
+ Assert.assertEquals("default rule", reason4.get());
+ });
+ }
+
+ @Test
+ public void testJsonFileModified() throws Exception {
+ List<FileCacheAdmissionManager.AdmissionRule> rules = new
ArrayList<>();
+ long createdTime = 0;
+ long updatedTime = 0;
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 5L, "user_5", "catalog_5", "database_5", "table_5", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ File jsonFile5 = new
File(Config.file_cache_admission_control_json_dir, "rules_5.json");
+ objectMapper.writeValue(jsonFile5, rules);
+
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ AtomicReference<String> reason1 = new AtomicReference<>();
+ boolean result1 = manager.isAdmittedAtTableLevel("user_5",
"catalog_5", "database_5", "table_5", reason1);
+ Assert.assertTrue(result1);
+ Assert.assertEquals("user table-level whitelist rule",
reason1.get());
+
+ AtomicReference<String> reason2 = new AtomicReference<>();
+ boolean result2 = manager.isAdmittedAtTableLevel("user_6",
"catalog_6", "database_6", "table_6", reason2);
+ Assert.assertFalse(result2);
+ Assert.assertEquals("default rule", reason2.get());
+ });
+
+ rules.clear();
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 6L, "user_6", "catalog_6", "database_6", "table_6", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ objectMapper.writeValue(jsonFile5, rules);
+
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ AtomicReference<String> reason3 = new AtomicReference<>();
+ boolean result3 = manager.isAdmittedAtTableLevel("user_5",
"catalog_5", "database_5", "table_5", reason3);
+ Assert.assertFalse(result3);
+ Assert.assertEquals("default rule", reason3.get());
+
+ AtomicReference<String> reason4 = new AtomicReference<>();
+ boolean result4 = manager.isAdmittedAtTableLevel("user_6",
"catalog_6", "database_6", "table_6", reason4);
+ Assert.assertTrue(result4);
+ Assert.assertEquals("user table-level whitelist rule",
reason4.get());
+ });
+ }
+
+ private static void deleteDirectoryRecursively(Path directory) throws
IOException {
+ if (!Files.exists(directory)) {
+ return;
+ }
+
+ Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes
attrs) throws IOException {
+ Files.delete(file);
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult postVisitDirectory(Path dir, IOException
exc) throws IOException {
+ if (exc != null) {
+ throw exc;
+ }
+ Files.delete(dir);
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ }
+
+ @AfterClass
+ public static void deleteJsonFile() throws Exception {
+ Path currentDir = Paths.get("").toAbsolutePath();
+ Path jsonFileDir = currentDir.resolve("jsonFileDir-test");
+
+ if
(Files.exists(Paths.get(Config.file_cache_admission_control_json_dir))) {
+ deleteDirectoryRecursively(jsonFileDir);
+ }
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/SplitAssignmentTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/SplitAssignmentTest.java
index ab5205b47a7..28b2c604fdf 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/SplitAssignmentTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/SplitAssignmentTest.java
@@ -76,7 +76,8 @@ public class SplitAssignmentTest {
mockSplitGenerator,
mockSplitToScanRange,
locationProperties,
- pathPartitionKeys
+ pathPartitionKeys,
+ true
);
}
@@ -92,7 +93,8 @@ public class SplitAssignmentTest {
mockBackendPolicy.computeScanRangeAssignment((List<Split>)
any);
result = batch;
- mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys);
+ mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys,
+ true);
result = mockScanRangeLocations;
}
};
@@ -127,7 +129,8 @@ public class SplitAssignmentTest {
mockSplitGenerator,
mockSplitToScanRange,
locationProperties,
- pathPartitionKeys
+ pathPartitionKeys,
+ true
);
new MockUp<SplitAssignment>() {
@@ -196,7 +199,8 @@ public class SplitAssignmentTest {
mockBackendPolicy.computeScanRangeAssignment((List<Split>)
any);
result = batch;
- mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys);
+ mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys,
+ true);
result = mockScanRangeLocations;
}
};
@@ -226,7 +230,8 @@ public class SplitAssignmentTest {
result = batch;
minTimes = 0;
- mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys);
+ mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys,
+ true);
result = mockScanRangeLocations;
minTimes = 0;
}
@@ -257,7 +262,8 @@ public class SplitAssignmentTest {
mockBackendPolicy.computeScanRangeAssignment((List<Split>)
any);
result = batch;
- mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys);
+ mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys,
+ true);
result = mockScanRangeLocations;
}
};
@@ -287,7 +293,8 @@ public class SplitAssignmentTest {
mockBackendPolicy.computeScanRangeAssignment((List<Split>)
any);
result = batch;
- mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys);
+ mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys,
+ true);
result = mockScanRangeLocations;
}
};
@@ -339,7 +346,8 @@ public class SplitAssignmentTest {
mockBackendPolicy.computeScanRangeAssignment((List<Split>)
any);
result = batch;
- mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys);
+ mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys,
+ true);
result = mockScanRangeLocations;
}
};
@@ -383,7 +391,8 @@ public class SplitAssignmentTest {
mockBackendPolicy.computeScanRangeAssignment((List<Split>)
any);
result = batch;
- mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys);
+ mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys,
+ true);
result = mockScanRangeLocations;
}
};
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 159153e7736..615b4584bce 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -518,6 +518,7 @@ struct TFileRangeDesc {
14: optional i64 self_split_weight
// whether the value of columns_from_path is null
15: optional list<bool> columns_from_path_is_null;
+ 16: optional bool file_cache_admission;
}
struct TSplitSource {
diff --git a/tools/export_mysql_rule_to_json.sh
b/tools/export_mysql_rule_to_json.sh
new file mode 100755
index 00000000000..0c44ff09524
--- /dev/null
+++ b/tools/export_mysql_rule_to_json.sh
@@ -0,0 +1,83 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+set -e
+
+# Configuration
+DB_HOST="localhost"
+DB_USER="root"
+DB_NAME="file_cache_admission_control"
+DB_PASS=""
+TABLE_NAME="admission_policy"
+OUTPUT_FILE="rule_$(date +%Y%m%d_%H%M%S).json"
+
+echo "=== Database Export Configuration ==="
+echo "Database Host: $DB_HOST"
+echo "Database User: $DB_USER"
+echo "Database Name: $DB_NAME"
+echo "Password: $(if [ -n "$DB_PASS" ]; then echo "Set"; else echo "Not set";
fi)"
+echo "Table Name: $TABLE_NAME"
+echo "Output File: $OUTPUT_FILE"
+echo "====================================="
+echo ""
+
+# Query and convert to JSON (including long type timestamps)
+QUERY=$(cat <<SQL
+SELECT
+ JSON_ARRAYAGG(
+ JSON_OBJECT(
+ 'id', id,
+ 'user_identity', user_identity,
+ 'catalog_name', IFNULL(catalog_name, ''),
+ 'database_name', IFNULL(database_name, ''),
+ 'table_name', IFNULL(table_name, ''),
+ 'partition_pattern', IFNULL(partition_pattern, ''),
+ 'rule_type', rule_type,
+ 'enabled', CASE WHEN enabled = 1 THEN true ELSE false END,
+ 'created_time', UNIX_TIMESTAMP(created_time),
+ 'updated_time', UNIX_TIMESTAMP(updated_time)
+ )
+ ) AS json_data
+FROM ${TABLE_NAME}
+SQL
+)
+
+# Execute query
+if [ -n "$DB_PASS" ]; then
+ JSON_DATA=$(echo "$QUERY" | mysql -h $DB_HOST -u $DB_USER -p$DB_PASS
$DB_NAME -N 2>/dev/null)
+else
+ JSON_DATA=$(echo "$QUERY" | mysql -h $DB_HOST -u $DB_USER $DB_NAME -N)
+fi
+
+# Handle NULL
+if [ "$JSON_DATA" = "NULL" ] || [ -z "$JSON_DATA" ]; then
+ JSON_DATA="[]"
+fi
+
+# Save to file
+echo "$JSON_DATA" > "$OUTPUT_FILE"
+
+# Format
+if command -v jq &> /dev/null; then
+ jq '.' "$OUTPUT_FILE" | awk '
+ /^ {/ && NR > 3 {print ""}
+ {print}
+ ' > "${OUTPUT_FILE}.tmp" && mv "${OUTPUT_FILE}.tmp" "$OUTPUT_FILE"
+fi
+
+echo "Export completed: $OUTPUT_FILE"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]